[1/3] couch commit: updated refs/heads/1843-feature-bigcouch to d04839e

Repository: couchdb-couch
Updated Branches:
  refs/heads/1843-feature-bigcouch b13052435 -> d04839eaf

Add hard limit for OS processes

Prior to this commit, there was only a soft OS process cap in BigCouch.
This commit adds a hard process cap, in addition to the soft process
cap. When the hard process cap is reached, all requests for additional
processes are blocked until a process becomes available. The soft OS
process limit was left in place, so if the number of OS processes exceed
the soft limit (and some of the processes are idle for a certain amount
of time) Couch will shut down some of the processes.

Also, the soft OS process cap would limit the total number of OS
processes rather than OS processes associated with a particular
language. This was changed to be per-language in order to be consistent
with CouchDB's per-language hard cap.


Branch: refs/heads/1843-feature-bigcouch
Commit: 135dd706a79f124d0707d9a5ddec2ae587faeff7
Parents: b130524
Author: Benjamin Bastian <>
Authored: Thu Feb 13 14:50:11 2014 -0800
Committer: Benjamin Bastian <>
Committed: Mon Feb 17 10:45:43 2014 -0800

 src/couch_proc_manager.erl | 212 +++++++++++++++++++++++++++-------------
 1 file changed, 144 insertions(+), 68 deletions(-)
diff --git a/src/couch_proc_manager.erl b/src/couch_proc_manager.erl
index 8027e76..5dd000b 100644
--- a/src/couch_proc_manager.erl
+++ b/src/couch_proc_manager.erl
@@ -17,7 +17,7 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, 
--export([start_link/0, get_proc_count/0, new_proc/2, new_proc/4]).
+-export([start_link/0, get_proc_count/0, new_proc/1]).
 % config_listener api
@@ -26,7 +26,17 @@
 -record(state, {
-    config
+    config,
+    proc_counts,
+    waiting
+-record(client, {
+    timestamp,
+    from,
+    lang,
+    ddoc,
+    ddoc_key
 start_link() ->
@@ -40,7 +50,10 @@ init([]) ->
     ok = config:listen_for_changes(?MODULE, nil),
     {ok, #state{
         tab = ets:new(procs, [ordered_set, {keypos,}]),
-        config = get_proc_config()
+        config = get_proc_config(),
+        proc_counts = dict:new(),
+        waiting = ets:new(couch_proc_manage_waiting,
+                [ordered_set, {keypos, #client.timestamp}])
 handle_call(get_table, _From, State) ->
@@ -50,12 +63,13 @@ handle_call(get_proc_count, _From, State) ->
     {reply, ets:info(, size), State};
 handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
-    {Client, _} = From,
-    Lang = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+    {ClientPid, _} = From,
+    Lang = couch_util:to_binary(
+            couch_util:get_value(<<"language">>, Props, <<"javascript">>)),
     IterFun = fun(Proc, Acc) ->
         case lists:member(DDocKey, Proc#proc.ddoc_keys) of
             true ->
-                {stop, assign_proc(, Client, Proc)};
+                {stop, assign_proc(, ClientPid, Proc)};
             false ->
                 {ok, Acc}
@@ -63,60 +77,41 @@ handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
     TeachFun = fun(Proc0, Acc) ->
             {ok, Proc1} = teach_ddoc(DDoc, DDocKey, Proc0),
-            {stop, assign_proc(, Client, Proc1)}
+            {stop, assign_proc(, ClientPid, Proc1)}
         catch _:_ ->
             {ok, Acc}
-    try iter_procs(, Lang, IterFun, nil) of
-    {not_found, _} ->
-        case iter_procs(, Lang, TeachFun, nil) of
-        {not_found, _} ->
-            spawn_link(?MODULE, new_proc, [From, Lang, DDoc, DDocKey]),
-            {noreply, State};
-        {ok, Proc} ->
-            {reply, {ok, Proc, State#state.config}, State}
-        end;
-    {ok, Proc} ->
-        {reply, {ok, Proc, State#state.config}, State}
-    catch error:Reason ->
-        ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
-        {reply, {error, Reason}, State}
-    end;
+    Client = #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey},
+    find_proc(State, Client, [IterFun, TeachFun]);
-handle_call({get_proc, Lang}, {Client, _} = From, State) ->
+handle_call({get_proc, Lang}, From, State) ->
+    {ClientPid, _} = From,
     IterFun = fun(Proc, _Acc) ->
-        {stop, assign_proc(, Client, Proc)}
+        {stop, assign_proc(, ClientPid, Proc)}
-    try iter_procs(, Lang, IterFun, nil) of
-    {not_found, _} ->
-        spawn_link(?MODULE, new_proc, [From, Lang]),
-        {noreply, State};
-    {ok, Proc} ->
-        {reply, {ok, Proc, State#state.config}, State}
-    catch error:Reason ->
-        ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
-        {reply, {error, Reason}, State}
-    end;
+    Client = #client{from=From, lang=couch_util:to_binary(Lang)},
+    find_proc(State, Client, [IterFun]);
-handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) ->
+handle_call({ret_proc, #proc{client=Ref, lang=Lang0} = Proc}, _From, State) ->
     erlang:demonitor(Ref, [flush]),
+    Lang = couch_util:to_binary(Lang0),
     % We need to check if the process is alive here, as the client could be
     % handing us a #proc{} with a dead one.  We would have already removed the
     % #proc{} from our own table, so the alternative is to do a lookup in the
     % table before the insert.  Don't know which approach is cheaper.
-    return_proc(, Proc),
-    {reply, true, State};
+    {reply, true, return_proc(State, Proc#proc{lang=Lang})};
 handle_call(_Call, _From, State) ->
     {reply, ignored, State}.
-handle_cast({os_proc_idle, Pid}, #state{tab=Tab}=State) ->
-    Limit = config:get("query_server_config", "os_process_soft_limit", "100"),
-    case ets:lookup(Tab, Pid) of
-        [#proc{client=nil}] ->
-            case ets:info(Tab, size) > list_to_integer(Limit) of
-                true ->
+handle_cast({os_proc_idle, Pid}, #state{tab=Tab, proc_counts=Counts}=State0) ->
+    Limit = list_to_integer(
+            config:get("query_server_config", "os_process_soft_limit", "100")),
+    State = case ets:lookup(Tab, Pid) of
+        [#proc{client=nil, lang=Lang}] ->
+            case dict:find(Lang, Counts) of
+                {ok, Count} when Count > Limit ->
                     ?LOG_INFO("Closing idle OS Process: ~p", [Pid]),
                     ets:delete(Tab, Pid),
                     case is_process_alive(Pid) of
@@ -125,12 +120,15 @@ handle_cast({os_proc_idle, Pid}, #state{tab=Tab}=State) ->
                             gen_server:cast(Pid, stop);
                         _ ->
-                    end;
-                _ ->
-                    ok
+                    end,
+                    State0#state{
+                        proc_counts=dict:update_counter(Lang, -1, Counts)
+                    };
+                {ok, _} ->
+                    State0
         _ ->
-            ok
+            State0
     {noreply, State};
 handle_cast(reload_config, State) ->
@@ -142,25 +140,39 @@ handle_cast(_Msg, State) ->
 handle_info(shutdown, State) ->
     {stop, shutdown, State};
-handle_info({'EXIT', _, {ok, Proc0, {Client,_} = From}}, State) ->
+handle_info({'EXIT', _, {ok, Proc0, {ClientPid,_} = From}}, State) ->
-    Proc = assign_proc(, Client, Proc0),
+    Proc = assign_proc(, ClientPid, Proc0),
     gen_server:reply(From, {ok, Proc, State#state.config}),
     {noreply, State};
 handle_info({'EXIT', Pid, Reason}, State) ->
+    #state{proc_counts=Counts, waiting=Waiting} = State,
     ?LOG_INFO("~p ~p died ~p", [?MODULE, Pid, Reason]),
+    MaybeProc = ets:lookup(, Pid),
     ets:delete(, Pid),
-    {noreply, State};
+    case MaybeProc of
+        [#proc{lang=Lang}] ->
+            case get_waiting_client(Waiting, Lang) of
+                nil ->
+                    {noreply, State#state{
+                        proc_counts=dict:update_counter(Lang, -1, Counts)
+                    }};
+                Client ->
+                    spawn_link(?MODULE, new_proc, [Client]),
+                    {noreply, State}
+            end;
+        [] ->
+            {noreply, State}
+    end;
-handle_info({'DOWN', Ref, _, _, _Reason}, State) ->
-    case ets:match_object(, #proc{client=Ref, _='_'}) of
+handle_info({'DOWN', Ref, _, _, _Reason}, State0) ->
+    case ets:match_object(, #proc{client=Ref, _='_'}) of
     [] ->
-        ok;
+        {noreply, State0};
     [#proc{} = Proc] ->
-        return_proc(, Proc)
-    end,
-    {noreply, State};
+        {noreply, return_proc(State0, Proc)}
+    end;
 handle_info({gen_event_EXIT, {config_listener, ?MODULE}, _Reason}, State) ->
     erlang:send_after(5000, self(), restart_config_listener),
@@ -189,8 +201,8 @@ handle_config_change("query_server_config", _, _, _, _) ->
 handle_config_change(_, _, _, _, _) ->
     {ok, nil}.
-iter_procs(Tab, Lang, Fun, Acc) when is_binary(Lang) ->
-    iter_procs(Tab, binary_to_list(Lang), Fun, Acc);
+iter_procs(Tab, Lang, Fun, Acc) when is_list(Lang) ->
+    iter_procs(Tab, list_to_binary(Lang), Fun, Acc);
 iter_procs(Tab, Lang, Fun, Acc) ->
     Pattern = #proc{lang=Lang, client=nil, _='_'},
     MSpec = [{Pattern, [], ['$_']}],
@@ -216,15 +228,17 @@ iter_procs({[Proc | Rest], Continuation}, Fun, Acc0) ->
             {ok, Acc1}
-new_proc(From, Lang) ->
+new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) ->
+    #client{from=From, lang=Lang} = Client,
     case new_proc_int(From, Lang) of
     {ok, Proc} ->
         exit({ok, Proc, From});
     Error ->
         gen_server:reply(From, {error, Error})
-    end.
+    end;
-new_proc(From, Lang, DDoc, DDocKey) ->
+new_proc(Client) ->
+    #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client,
     case new_proc_int(From, Lang) of
     {ok, NewProc} ->
         case proc_with_ddoc(DDoc, DDocKey, [NewProc]) of
@@ -267,19 +281,52 @@ make_proc(Pid, Lang, Mod) ->
     {ok, Proc}.
-assign_proc(Tab, Client, #proc{client=nil}=Proc0) ->
-    Proc = Proc0#proc{client = erlang:monitor(process, Client)},
+assign_proc(Tab, ClientPid, #proc{client=nil}=Proc0) when is_pid(ClientPid) ->
+    Proc = Proc0#proc{client = erlang:monitor(process, ClientPid)},
     ets:insert(Tab, Proc),
-    Proc.
+    Proc;
+assign_proc(Tab, #client{}=Client, #proc{client=nil}=Proc) ->
+    {Pid, _} = Client#client.from,
+    assign_proc(Tab, Pid, Proc).
-return_proc(Tab, #proc{pid=Pid} = Proc) ->
+return_proc(State, #proc{pid=Pid, lang=Lang} = Proc) ->
+    #state{tab=Tab, waiting=Waiting} = State,
     case is_process_alive(Pid) of true ->
-        gen_server:cast(Pid, garbage_collect),
-        ets:insert(Tab, Proc#proc{client=nil});
+        case get_waiting_client(Waiting, Lang) of
+            nil ->
+                gen_server:cast(Pid, garbage_collect),
+                ets:insert(Tab, Proc#proc{client=nil}),
+                State;
+            #client{}=Client ->
+                From = Client#client.from,
+                assign_proc(Tab, Client, Proc#proc{client=nil}),
+                gen_server:reply(From, {ok, Proc, State#state.config}),
+                State
+        end;
     false ->
-        ets:delete(Tab, Pid)
+        ets:delete(Tab, Pid),
+        case get_waiting_client(Waiting, Lang) of
+            nil ->
+                State;
+            #client{}=Client ->
+                maybe_spawn_proc(State, Client)
+        end
+get_waiting_client(Tab, Lang) when is_list(Lang) ->
+    get_waiting_client(Tab, couch_util:to_binary(Lang));
+get_waiting_client(Tab, Lang) ->
+    case ets:match_object(Tab, #client{lang=Lang, _='_'}, 1) of
+        '$end_of_table' ->
+            nil;
+        {[#client{}=Client], _} ->
+            ets:delete(Tab, Client#client.timestamp),
+            Client
+    end.
+add_waiting_client(Tab, Client) ->
+    ets:insert(Tab, Client#client{timestamp=now()}).
 get_proc_config() ->
     Limit = config:get("query_server_config", "reduce_limit", "true"),
     Timeout = config:get("couchdb", "os_process_timeout", "5000"),
@@ -317,3 +364,32 @@ teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
     Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
     % add ddoc to the proc
     {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
+maybe_spawn_proc(State, Client) ->
+    #state{proc_counts=Counts, waiting=Waiting} = State,
+    #client{lang=Lang} = Client,
+    Limit = list_to_integer(config:get(
+                "query_server_config", "os_process_limit", "100")),
+    case dict:find(Lang, Counts) of
+    {ok, Limit} ->
+        add_waiting_client(Waiting, Client),
+        State;
+    _ ->
+        spawn_link(?MODULE, new_proc, [Client]),
+        State#state{
+            proc_counts=dict:update_counter(Lang, 1, Counts)
+        }
+    end.
+find_proc(State, Client, [Fun|FindFuns]) ->
+    try iter_procs(, Client#client.lang, Fun, nil) of
+    {not_found, _} ->
+        find_proc(State, Client, FindFuns);
+    {ok, Proc} ->
+        {reply, {ok, Proc, State#state.config}, State}
+    catch error:Reason ->
+        ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
+        {reply, {error, Reason}, State}
+    end;
+find_proc(State, Client, []) ->
+    {noreply, maybe_spawn_proc(State, Client)}.

[2/3] couch commit: updated refs/heads/1843-feature-bigcouch to d04839e

Rearrange couch_proc_manager for readability. No functional changes.


Branch: refs/heads/1843-feature-bigcouch
Commit: a19a8a1f2b54669ea1482905883ce35db84e6572
Parents: 135dd70
Author: Benjamin Bastian <>
Authored: Sat Feb 15 23:31:29 2014 -0800
Committer: Benjamin Bastian <>
Committed: Mon Feb 17 10:45:53 2014 -0800

 src/couch_proc_manager.erl | 124 ++++++++++++++++++++--------------------
 1 file changed, 62 insertions(+), 62 deletions(-)
diff --git a/src/couch_proc_manager.erl b/src/couch_proc_manager.erl
index 5dd000b..c8a5b77 100644
--- a/src/couch_proc_manager.erl
+++ b/src/couch_proc_manager.erl
@@ -201,6 +201,19 @@ handle_config_change("query_server_config", _, _, _, _) ->
 handle_config_change(_, _, _, _, _) ->
     {ok, nil}.
+find_proc(State, Client, [Fun|FindFuns]) ->
+    try iter_procs(, Client#client.lang, Fun, nil) of
+    {not_found, _} ->
+        find_proc(State, Client, FindFuns);
+    {ok, Proc} ->
+        {reply, {ok, Proc, State#state.config}, State}
+    catch error:Reason ->
+        ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
+        {reply, {error, Reason}, State}
+    end;
+find_proc(State, Client, []) ->
+    {noreply, maybe_spawn_proc(State, Client)}.
 iter_procs(Tab, Lang, Fun, Acc) when is_list(Lang) ->
     iter_procs(Tab, list_to_binary(Lang), Fun, Acc);
 iter_procs(Tab, Lang, Fun, Acc) ->
@@ -269,6 +282,36 @@ new_proc_int(From, Lang) when is_list(Lang) ->
         make_proc(Pid, Lang, couch_os_process)
+proc_with_ddoc(DDoc, DDocKey, Procs) ->
+    Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end,
+    case lists:dropwhile(Filter, Procs) of
+    [DDocProc|_] ->
+        {ok, DDocProc};
+    [] ->
+        teach_any_proc(DDoc, DDocKey, Procs)
+    end.
+teach_any_proc(DDoc, DDocKey, [Proc|Rest]) ->
+    try
+        teach_ddoc(DDoc, DDocKey, Proc)
+    catch _:_ ->
+        teach_any_proc(DDoc, DDocKey, Rest)
+    end;
+teach_any_proc(_, _, []) ->
+    {error, noproc}.
+teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
+    % send ddoc over the wire
+    % we only share the rev with the client we know to update code
+    % but it only keeps the latest copy, per each ddoc, around.
+    true = couch_query_servers:proc_prompt(Proc, [<<"ddoc">>, <<"new">>,
+        DDocId, couch_doc:to_json_obj(DDoc, [])]),
+    % we should remove any other ddocs keys for this docid
+    % because the query server overwrites without the rev
+    Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
+    % add ddoc to the proc
+    {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
 make_proc(Pid, Lang, Mod) ->
     Proc = #proc{
         lang = Lang,
@@ -313,6 +356,25 @@ return_proc(State, #proc{pid=Pid, lang=Lang} = Proc) ->
+maybe_spawn_proc(State, Client) ->
+    #state{proc_counts=Counts, waiting=Waiting} = State,
+    #client{lang=Lang} = Client,
+    Limit = list_to_integer(config:get(
+                "query_server_config", "os_process_limit", "100")),
+    case dict:find(Lang, Counts) of
+    {ok, Limit} ->
+        add_waiting_client(Waiting, Client),
+        State;
+    _ ->
+        spawn_link(?MODULE, new_proc, [Client]),
+        State#state{
+            proc_counts=dict:update_counter(Lang, 1, Counts)
+        }
+    end.
+add_waiting_client(Tab, Client) ->
+    ets:insert(Tab, Client#client{timestamp=now()}).
 get_waiting_client(Tab, Lang) when is_list(Lang) ->
     get_waiting_client(Tab, couch_util:to_binary(Lang));
 get_waiting_client(Tab, Lang) ->
@@ -324,9 +386,6 @@ get_waiting_client(Tab, Lang) ->
-add_waiting_client(Tab, Client) ->
-    ets:insert(Tab, Client#client{timestamp=now()}).
 get_proc_config() ->
     Limit = config:get("query_server_config", "reduce_limit", "true"),
     Timeout = config:get("couchdb", "os_process_timeout", "5000"),
@@ -334,62 +393,3 @@ get_proc_config() ->
         {<<"reduce_limit">>, list_to_atom(Limit)},
         {<<"timeout">>, list_to_integer(Timeout)}
-proc_with_ddoc(DDoc, DDocKey, Procs) ->
-    Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end,
-    case lists:dropwhile(Filter, Procs) of
-    [DDocProc|_] ->
-        {ok, DDocProc};
-    [] ->
-        teach_any_proc(DDoc, DDocKey, Procs)
-    end.
-teach_any_proc(DDoc, DDocKey, [Proc|Rest]) ->
-    try
-        teach_ddoc(DDoc, DDocKey, Proc)
-    catch _:_ ->
-        teach_any_proc(DDoc, DDocKey, Rest)
-    end;
-teach_any_proc(_, _, []) ->
-    {error, noproc}.
-teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
-    % send ddoc over the wire
-    % we only share the rev with the client we know to update code
-    % but it only keeps the latest copy, per each ddoc, around.
-    true = couch_query_servers:proc_prompt(Proc, [<<"ddoc">>, <<"new">>,
-        DDocId, couch_doc:to_json_obj(DDoc, [])]),
-    % we should remove any other ddocs keys for this docid
-    % because the query server overwrites without the rev
-    Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
-    % add ddoc to the proc
-    {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
-maybe_spawn_proc(State, Client) ->
-    #state{proc_counts=Counts, waiting=Waiting} = State,
-    #client{lang=Lang} = Client,
-    Limit = list_to_integer(config:get(
-                "query_server_config", "os_process_limit", "100")),
-    case dict:find(Lang, Counts) of
-    {ok, Limit} ->
-        add_waiting_client(Waiting, Client),
-        State;
-    _ ->
-        spawn_link(?MODULE, new_proc, [Client]),
-        State#state{
-            proc_counts=dict:update_counter(Lang, 1, Counts)
-        }
-    end.
-find_proc(State, Client, [Fun|FindFuns]) ->
-    try iter_procs(, Client#client.lang, Fun, nil) of
-    {not_found, _} ->
-        find_proc(State, Client, FindFuns);
-    {ok, Proc} ->
-        {reply, {ok, Proc, State#state.config}, State}
-    catch error:Reason ->
-        ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
-        {reply, {error, Reason}, State}
-    end;
-find_proc(State, Client, []) ->
-    {noreply, maybe_spawn_proc(State, Client)}.

[3/3] couch commit: updated refs/heads/1843-feature-bigcouch to d04839e

Merge pull request #1 from 'sagelywizard/2002-hard-os-proc-cap'

Add hard limit for OS processes


Branch: refs/heads/1843-feature-bigcouch
Commit: d04839eaf77620ce10c69d7f65d1641b8805822e
Parents: b130524 a19a8a1
Author: Paul J. Davis <>
Authored: Mon Feb 17 12:57:28 2014 -0600
Committer: Paul J. Davis <>
Committed: Mon Feb 17 12:57:28 2014 -0600

 src/couch_proc_manager.erl | 264 ++++++++++++++++++++++++++--------------
 1 file changed, 170 insertions(+), 94 deletions(-)