You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/02/17 19:59:13 UTC
[2/3] couch commit: updated refs/heads/1843-feature-bigcouch to
d04839e
Rearrange couch_proc_manager for readability. No functional changes.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/a19a8a1f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/a19a8a1f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/a19a8a1f
Branch: refs/heads/1843-feature-bigcouch
Commit: a19a8a1f2b54669ea1482905883ce35db84e6572
Parents: 135dd70
Author: Benjamin Bastian <be...@gmail.com>
Authored: Sat Feb 15 23:31:29 2014 -0800
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Mon Feb 17 10:45:53 2014 -0800
----------------------------------------------------------------------
src/couch_proc_manager.erl | 124 ++++++++++++++++++++--------------------
1 file changed, 62 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/a19a8a1f/src/couch_proc_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_proc_manager.erl b/src/couch_proc_manager.erl
index 5dd000b..c8a5b77 100644
--- a/src/couch_proc_manager.erl
+++ b/src/couch_proc_manager.erl
@@ -201,6 +201,19 @@ handle_config_change("query_server_config", _, _, _, _) ->
handle_config_change(_, _, _, _, _) ->
{ok, nil}.
+find_proc(State, Client, [Fun|FindFuns]) ->
+ try iter_procs(State#state.tab, Client#client.lang, Fun, nil) of
+ {not_found, _} ->
+ find_proc(State, Client, FindFuns);
+ {ok, Proc} ->
+ {reply, {ok, Proc, State#state.config}, State}
+ catch error:Reason ->
+ ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
+ {reply, {error, Reason}, State}
+ end;
+find_proc(State, Client, []) ->
+ {noreply, maybe_spawn_proc(State, Client)}.
+
iter_procs(Tab, Lang, Fun, Acc) when is_list(Lang) ->
iter_procs(Tab, list_to_binary(Lang), Fun, Acc);
iter_procs(Tab, Lang, Fun, Acc) ->
@@ -269,6 +282,36 @@ new_proc_int(From, Lang) when is_list(Lang) ->
make_proc(Pid, Lang, couch_os_process)
end.
+proc_with_ddoc(DDoc, DDocKey, Procs) ->
+ Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end,
+ case lists:dropwhile(Filter, Procs) of
+ [DDocProc|_] ->
+ {ok, DDocProc};
+ [] ->
+ teach_any_proc(DDoc, DDocKey, Procs)
+ end.
+
+teach_any_proc(DDoc, DDocKey, [Proc|Rest]) ->
+ try
+ teach_ddoc(DDoc, DDocKey, Proc)
+ catch _:_ ->
+ teach_any_proc(DDoc, DDocKey, Rest)
+ end;
+teach_any_proc(_, _, []) ->
+ {error, noproc}.
+
+teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
+ % send ddoc over the wire
+ % we only share the rev with the client we know to update code
+ % but it only keeps the latest copy, per each ddoc, around.
+ true = couch_query_servers:proc_prompt(Proc, [<<"ddoc">>, <<"new">>,
+ DDocId, couch_doc:to_json_obj(DDoc, [])]),
+ % we should remove any other ddocs keys for this docid
+ % because the query server overwrites without the rev
+ Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
+ % add ddoc to the proc
+ {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
+
make_proc(Pid, Lang, Mod) ->
Proc = #proc{
lang = Lang,
@@ -313,6 +356,25 @@ return_proc(State, #proc{pid=Pid, lang=Lang} = Proc) ->
end
end.
+maybe_spawn_proc(State, Client) ->
+ #state{proc_counts=Counts, waiting=Waiting} = State,
+ #client{lang=Lang} = Client,
+ Limit = list_to_integer(config:get(
+ "query_server_config", "os_process_limit", "100")),
+ case dict:find(Lang, Counts) of
+ {ok, Limit} ->
+ add_waiting_client(Waiting, Client),
+ State;
+ _ ->
+ spawn_link(?MODULE, new_proc, [Client]),
+ State#state{
+ proc_counts=dict:update_counter(Lang, 1, Counts)
+ }
+ end.
+
+add_waiting_client(Tab, Client) ->
+ ets:insert(Tab, Client#client{timestamp=now()}).
+
get_waiting_client(Tab, Lang) when is_list(Lang) ->
get_waiting_client(Tab, couch_util:to_binary(Lang));
get_waiting_client(Tab, Lang) ->
@@ -324,9 +386,6 @@ get_waiting_client(Tab, Lang) ->
Client
end.
-add_waiting_client(Tab, Client) ->
- ets:insert(Tab, Client#client{timestamp=now()}).
-
get_proc_config() ->
Limit = config:get("query_server_config", "reduce_limit", "true"),
Timeout = config:get("couchdb", "os_process_timeout", "5000"),
@@ -334,62 +393,3 @@ get_proc_config() ->
{<<"reduce_limit">>, list_to_atom(Limit)},
{<<"timeout">>, list_to_integer(Timeout)}
]}.
-
-proc_with_ddoc(DDoc, DDocKey, Procs) ->
- Filter = fun(#proc{ddoc_keys=Keys}) -> not lists:member(DDocKey, Keys) end,
- case lists:dropwhile(Filter, Procs) of
- [DDocProc|_] ->
- {ok, DDocProc};
- [] ->
- teach_any_proc(DDoc, DDocKey, Procs)
- end.
-
-teach_any_proc(DDoc, DDocKey, [Proc|Rest]) ->
- try
- teach_ddoc(DDoc, DDocKey, Proc)
- catch _:_ ->
- teach_any_proc(DDoc, DDocKey, Rest)
- end;
-teach_any_proc(_, _, []) ->
- {error, noproc}.
-
-teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc{ddoc_keys=Keys}=Proc) ->
- % send ddoc over the wire
- % we only share the rev with the client we know to update code
- % but it only keeps the latest copy, per each ddoc, around.
- true = couch_query_servers:proc_prompt(Proc, [<<"ddoc">>, <<"new">>,
- DDocId, couch_doc:to_json_obj(DDoc, [])]),
- % we should remove any other ddocs keys for this docid
- % because the query server overwrites without the rev
- Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
- % add ddoc to the proc
- {ok, Proc#proc{ddoc_keys=[DDocKey|Keys2]}}.
-
-maybe_spawn_proc(State, Client) ->
- #state{proc_counts=Counts, waiting=Waiting} = State,
- #client{lang=Lang} = Client,
- Limit = list_to_integer(config:get(
- "query_server_config", "os_process_limit", "100")),
- case dict:find(Lang, Counts) of
- {ok, Limit} ->
- add_waiting_client(Waiting, Client),
- State;
- _ ->
- spawn_link(?MODULE, new_proc, [Client]),
- State#state{
- proc_counts=dict:update_counter(Lang, 1, Counts)
- }
- end.
-
-find_proc(State, Client, [Fun|FindFuns]) ->
- try iter_procs(State#state.tab, Client#client.lang, Fun, nil) of
- {not_found, _} ->
- find_proc(State, Client, FindFuns);
- {ok, Proc} ->
- {reply, {ok, Proc, State#state.config}, State}
- catch error:Reason ->
- ?LOG_ERROR("~p ~p ~p", [?MODULE, Reason, erlang:get_stacktrace()]),
- {reply, {error, Reason}, State}
- end;
-find_proc(State, Client, []) ->
- {noreply, maybe_spawn_proc(State, Client)}.