You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/08/23 16:20:22 UTC

[couchdb] 02/05: Initial creation of couch_js application

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/fdb-layer-couch-eval
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 4f8eb89ee531942edfc0e7bcab188f6f3c143fad
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Tue Aug 20 12:43:14 2019 -0500

    Initial creation of couch_js application
    
    This commit is mostly a copy paste of the existing modules in the
    `couch` application. For now I've left the build of the `couchjs`
    executable in `couch/priv` to avoid having to do the work of moving the
    build config over. I had contemplated just referencing the modules as
    they current exist but decided this would prepare us a bit better for
    when we eventually remove the old modules.
---
 rebar.config.script                          |   1 +
 rel/reltool.config                           |   2 +
 src/couch_js/README.md                       |   6 +
 src/couch_js/src/couch_js.app.src            |  27 ++
 src/couch_js/src/couch_js_app.erl            |  31 ++
 src/couch_js/src/couch_js_io_logger.erl      | 107 +++++
 src/couch_js/src/couch_js_native_process.erl | 452 ++++++++++++++++++
 src/couch_js/src/couch_js_os_process.erl     | 265 +++++++++++
 src/couch_js/src/couch_js_proc_manager.erl   | 602 +++++++++++++++++++++++
 src/couch_js/src/couch_js_query_servers.erl  | 683 +++++++++++++++++++++++++++
 src/couch_js/src/couch_js_sup.erl            |  45 ++
 11 files changed, 2221 insertions(+)

diff --git a/rebar.config.script b/rebar.config.script
index ba7b754..8ef1abc 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -79,6 +79,7 @@ SubDirs = [
     "src/mem3",
     "src/couch_index",
     "src/couch_mrview",
+    "src/couch_js",
     "src/couch_replicator",
     "src/couch_plugins",
     "src/couch_pse_tests",
diff --git a/rel/reltool.config b/rel/reltool.config
index e2ae71c..caeea38 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -41,6 +41,7 @@
         couch_replicator,
         couch_stats,
         couch_eval,
+        couch_js,
         couch_event,
         couch_peruser,
         couch_views,
@@ -95,6 +96,7 @@
     {app, couch, [{incl_cond, include}]},
     {app, couch_epi, [{incl_cond, include}]},
     {app, couch_eval, [{incl_cond, include}]},
+    {app, couch_js, [{incl_cond, include}]},
     {app, couch_jobs, [{incl_cond, include}]},
     {app, couch_index, [{incl_cond, include}]},
     {app, couch_log, [{incl_cond, include}]},
diff --git a/src/couch_js/README.md b/src/couch_js/README.md
new file mode 100644
index 0000000..4084b7d
--- /dev/null
+++ b/src/couch_js/README.md
@@ -0,0 +1,6 @@
+couch_js
+===
+
+This application is just an isolation of most of the code required for running couchjs.
+
+For the time being I'm not moving the implementation of couchjs due to the specifics of the build system configuration. Once we go to remove the `couch` application we'll have to revisit that approach.
\ No newline at end of file
diff --git a/src/couch_js/src/couch_js.app.src b/src/couch_js/src/couch_js.app.src
new file mode 100644
index 0000000..0db37b6
--- /dev/null
+++ b/src/couch_js/src/couch_js.app.src
@@ -0,0 +1,27 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_js, [
+    {description, "An OTP application"},
+    {vsn, git},
+    {registered, [
+        couch_js_proc_manager
+    ]},
+    {mod, {couch_js_app, []}},
+    {applications, [
+        kernel,
+        stdlib,
+        config,
+        couch_log,
+        couch
+    ]}
+ ]}.
diff --git a/src/couch_js/src/couch_js_app.erl b/src/couch_js/src/couch_js_app.erl
new file mode 100644
index 0000000..b28f585
--- /dev/null
+++ b/src/couch_js/src/couch_js_app.erl
@@ -0,0 +1,31 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_js_app).
+
+
+-behaviour(application).
+
+
+-export([
+    start/2,
+    stop/1
+]).
+
+
+start(_StartType, _StartArgs) ->
+    couch_js_sup:start_link().
+
+
+stop(_State) ->
+    ok.
\ No newline at end of file
diff --git a/src/couch_js/src/couch_js_io_logger.erl b/src/couch_js/src/couch_js_io_logger.erl
new file mode 100644
index 0000000..5a1695c
--- /dev/null
+++ b/src/couch_js/src/couch_js_io_logger.erl
@@ -0,0 +1,107 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_js_io_logger).
+
+-export([
+    start/1,
+    log_output/1,
+    log_input/1,
+    stop_noerror/0,
+    stop_error/1
+]).
+
+
+start(undefined) ->
+    ok;
+start(Dir) ->
+    case filelib:is_dir(Dir) of
+        true ->
+            Name = log_name(),
+            Path = Dir ++ "/" ++ Name,
+            OPath = Path ++ ".out.log_",
+            IPath = Path ++ ".in.log_",
+            {ok, OFd} = file:open(OPath, [read, write, raw]),
+            {ok, IFd} = file:open(IPath, [read, write, raw]),
+            ok = file:delete(OPath),
+            ok = file:delete(IPath),
+            put(logger_path, Path),
+            put(logger_out_fd, OFd),
+            put(logger_in_fd, IFd),
+            ok;
+        false ->
+            ok
+    end.
+
+
+stop_noerror() ->
+    case get(logger_path) of
+        undefined ->
+            ok;
+        _Path ->
+            close_logs()
+    end.
+
+
+stop_error(Err) ->
+    case get(logger_path) of
+        undefined ->
+            ok;
+        Path ->
+            save_error_logs(Path, Err),
+            close_logs()
+    end.
+
+
+log_output(Data) ->
+    log(get(logger_out_fd), Data).
+
+
+log_input(Data) ->
+    log(get(logger_in_fd), Data).
+
+
+unix_time() ->
+    {Mega, Sec, USec} = os:timestamp(),
+    UnixTs = (Mega * 1000000 + Sec) * 1000000 + USec,
+    integer_to_list(UnixTs).
+
+
+log_name() ->
+    Ts = unix_time(),
+    Pid0 = erlang:pid_to_list(self()),
+    Pid1 = string:strip(Pid0, left, $<),
+    Pid2 = string:strip(Pid1, right, $>),
+    lists:flatten(io_lib:format("~s_~s", [Ts, Pid2])).
+
+
+close_logs() ->
+    file:close(get(logger_out_fd)),
+    file:close(get(logger_in_fd)).
+
+
+save_error_logs(Path, Err) ->
+    Otp = erlang:system_info(otp_release),
+    Msg = io_lib:format("Error: ~p~nNode: ~p~nOTP: ~p~n", [Err, node(), Otp]),
+    file:write_file(Path ++ ".meta", Msg),
+    IFd = get(logger_out_fd),
+    OFd = get(logger_in_fd),
+    file:position(IFd, 0),
+    file:position(OFd, 0),
+    file:copy(IFd, Path ++  ".out.log"),
+    file:copy(OFd, Path ++ ".in.log").
+
+
+log(undefined, _Data) ->
+    ok;
+log(Fd, Data) ->
+    ok = file:write(Fd, [Data, io_lib:nl()]).
diff --git a/src/couch_js/src/couch_js_native_process.erl b/src/couch_js/src/couch_js_native_process.erl
new file mode 100644
index 0000000..d2c4c1e
--- /dev/null
+++ b/src/couch_js/src/couch_js_native_process.erl
@@ -0,0 +1,452 @@
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+%
+% You may obtain a copy of the License at
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing,
+% software distributed under the License is distributed on an
+% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+% either express or implied.
+%
+% See the License for the specific language governing permissions
+% and limitations under the License.
+%
+% This file drew much inspiration from erlview, which was written by and
+% copyright Michael McDaniel [http://autosys.us], and is also under APL 2.0
+%
+%
+% This module provides the smallest possible native view-server.
+% With this module in-place, you can add the following to your couch INI files:
+%  [native_query_servers]
+%  erlang={couch_native_process, start_link, []}
+%
+% Which will then allow following example map function to be used:
+%
+%  fun({Doc}) ->
+%    % Below, we emit a single record - the _id as key, null as value
+%    DocId = couch_util:get_value(<<"_id">>, Doc, null),
+%    Emit(DocId, null)
+%  end.
+%
+% which should be roughly the same as the javascript:
+%    emit(doc._id, null);
+%
+% This module exposes enough functions such that a native erlang server can
+% act as a fully-fleged view server, but no 'helper' functions specifically
+% for simplifying your erlang view code.  It is expected other third-party
+% extensions will evolve which offer useful layers on top of this view server
+% to help simplify your view code.
+-module(couch_js_native_process).
+-behaviour(gen_server).
+-vsn(1).
+
+-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,
+         handle_info/2]).
+-export([set_timeout/2, prompt/2]).
+
+-define(STATE, native_proc_state).
+-record(evstate, {
+    ddocs,
+    funs = [],
+    query_config = [],
+    list_pid = nil,
+    timeout = 5000,
+    idle = 5000
+}).
+
+-include_lib("couch/include/couch_db.hrl").
+
+start_link() ->
+    gen_server:start_link(?MODULE, [], []).
+
+% this is a bit messy, see also couch_query_servers handle_info
+% stop(_Pid) ->
+%     ok.
+
+set_timeout(Pid, TimeOut) ->
+    gen_server:call(Pid, {set_timeout, TimeOut}).
+
+prompt(Pid, Data) when is_list(Data) ->
+    gen_server:call(Pid, {prompt, Data}).
+
+% gen_server callbacks
+init([]) ->
+    V = config:get("query_server_config", "os_process_idle_limit", "300"),
+    Idle = list_to_integer(V) * 1000,
+    {ok, #evstate{ddocs=dict:new(), idle=Idle}, Idle}.
+
+handle_call({set_timeout, TimeOut}, _From, State) ->
+    {reply, ok, State#evstate{timeout=TimeOut}, State#evstate.idle};
+
+handle_call({prompt, Data}, _From, State) ->
+    couch_log:debug("Prompt native qs: ~s",[?JSON_ENCODE(Data)]),
+    {NewState, Resp} = try run(State, to_binary(Data)) of
+        {S, R} -> {S, R}
+        catch
+            throw:{error, Why} ->
+                {State, [<<"error">>, Why, Why]}
+        end,
+
+    Idle = State#evstate.idle,
+    case Resp of
+        {error, Reason} ->
+            Msg = io_lib:format("couch native server error: ~p", [Reason]),
+            Error = [<<"error">>, <<"native_query_server">>, list_to_binary(Msg)],
+            {reply, Error, NewState, Idle};
+        [<<"error">> | Rest] ->
+            % Msg = io_lib:format("couch native server error: ~p", [Rest]),
+            % TODO: markh? (jan)
+            {reply, [<<"error">> | Rest], NewState, Idle};
+        [<<"fatal">> | Rest] ->
+            % Msg = io_lib:format("couch native server error: ~p", [Rest]),
+            % TODO: markh? (jan)
+            {stop, fatal, [<<"error">> | Rest], NewState};
+        Resp ->
+            {reply, Resp, NewState, Idle}
+    end.
+
+handle_cast(garbage_collect, State) ->
+    erlang:garbage_collect(),
+    {noreply, State, State#evstate.idle};
+handle_cast(stop, State) ->
+    {stop, normal, State};
+handle_cast(_Msg, State) ->
+    {noreply, State, State#evstate.idle}.
+
+handle_info(timeout, State) ->
+    gen_server:cast(couch_js_proc_manager, {os_proc_idle, self()}),
+    erlang:garbage_collect(),
+    {noreply, State, State#evstate.idle};
+handle_info({'EXIT',_,normal}, State) ->
+    {noreply, State, State#evstate.idle};
+handle_info({'EXIT',_,Reason}, State) ->
+    {stop, Reason, State}.
+terminate(_Reason, _State) -> ok.
+code_change(_OldVersion, State, _Extra) -> {ok, State}.
+
+run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) ->
+    Pid ! {self(), list_row, Row},
+    receive
+        {Pid, chunks, Data} ->
+            {State, [<<"chunks">>, Data]};
+        {Pid, list_end, Data} ->
+            receive
+                {'EXIT', Pid, normal} -> ok
+            after State#evstate.timeout ->
+                throw({timeout, list_cleanup})
+            end,
+            process_flag(trap_exit, erlang:get(do_trap)),
+            {State#evstate{list_pid=nil}, [<<"end">>, Data]}
+    after State#evstate.timeout ->
+        throw({timeout, list_row})
+    end;
+run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) ->
+    Pid ! {self(), list_end},
+    Resp =
+    receive
+        {Pid, list_end, Data} ->
+            receive
+                {'EXIT', Pid, normal} -> ok
+            after State#evstate.timeout ->
+                throw({timeout, list_cleanup})
+            end,
+            [<<"end">>, Data]
+    after State#evstate.timeout ->
+        throw({timeout, list_end})
+    end,
+    process_flag(trap_exit, erlang:get(do_trap)),
+    {State#evstate{list_pid=nil}, Resp};
+run(#evstate{list_pid=Pid}=State, _Command) when is_pid(Pid) ->
+    {State, [<<"error">>, list_error, list_error]};
+run(#evstate{ddocs=DDocs}, [<<"reset">>]) ->
+    {#evstate{ddocs=DDocs}, true};
+run(#evstate{ddocs=DDocs, idle=Idle}, [<<"reset">>, QueryConfig]) ->
+    NewState = #evstate{
+        ddocs = DDocs,
+        query_config = QueryConfig,
+        idle = Idle
+    },
+    {NewState, true};
+run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) ->
+    FunInfo = makefun(State, BinFunc),
+    {State#evstate{funs=Funs ++ [FunInfo]}, true};
+run(State, [<<"map_doc">> , Doc]) ->
+    Resp = lists:map(fun({Sig, Fun}) ->
+        erlang:put(Sig, []),
+        Fun(Doc),
+        lists:reverse(erlang:get(Sig))
+    end, State#evstate.funs),
+    {State, Resp};
+run(State, [<<"reduce">>, Funs, KVs]) ->
+    {Keys, Vals} =
+    lists:foldl(fun([K, V], {KAcc, VAcc}) ->
+        {[K | KAcc], [V | VAcc]}
+    end, {[], []}, KVs),
+    Keys2 = lists:reverse(Keys),
+    Vals2 = lists:reverse(Vals),
+    {State, catch reduce(State, Funs, Keys2, Vals2, false)};
+run(State, [<<"rereduce">>, Funs, Vals]) ->
+    {State, catch reduce(State, Funs, null, Vals, true)};
+run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) ->
+    DDocs2 = store_ddoc(DDocs, DDocId, DDoc),
+    {State#evstate{ddocs=DDocs2}, true};
+run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, DDocId | Rest]) ->
+    DDoc = load_ddoc(DDocs, DDocId),
+    ddoc(State, DDoc, Rest);
+run(_, Unknown) ->
+    couch_log:error("Native Process: Unknown command: ~p~n", [Unknown]),
+    throw({error, unknown_command}).
+
+ddoc(State, {DDoc}, [FunPath, Args]) ->
+    % load fun from the FunPath
+    BFun = lists:foldl(fun
+        (Key, {Props}) when is_list(Props) ->
+            couch_util:get_value(Key, Props, nil);
+        (_Key, Fun) when is_binary(Fun) ->
+            Fun;
+        (_Key, nil) ->
+            throw({error, not_found});
+        (_Key, _Fun) ->
+            throw({error, malformed_ddoc})
+        end, {DDoc}, FunPath),
+    ddoc(State, makefun(State, BFun, {DDoc}), FunPath, Args).
+
+ddoc(State, {_, Fun}, [<<"validate_doc_update">>], Args) ->
+    {State, (catch apply(Fun, Args))};
+ddoc(State, {_, Fun}, [<<"rewrites">>], Args) ->
+    {State, (catch apply(Fun, Args))};
+ddoc(State, {_, Fun}, [<<"filters">>|_], [Docs, Req]) ->
+    FilterFunWrapper = fun(Doc) ->
+        case catch Fun(Doc, Req) of
+        true -> true;
+        false -> false;
+        {'EXIT', Error} -> couch_log:error("~p", [Error])
+        end
+    end,
+    Resp = lists:map(FilterFunWrapper, Docs),
+    {State, [true, Resp]};
+ddoc(State, {_, Fun}, [<<"views">>|_], [Docs]) ->
+    MapFunWrapper = fun(Doc) ->
+        case catch Fun(Doc) of
+        undefined -> true;
+        ok -> false;
+        false -> false;
+        [_|_] -> true;
+        {'EXIT', Error} -> couch_log:error("~p", [Error])
+        end
+    end,
+    Resp = lists:map(MapFunWrapper, Docs),
+    {State, [true, Resp]};
+ddoc(State, {_, Fun}, [<<"shows">>|_], Args) ->
+    Resp = case (catch apply(Fun, Args)) of
+        FunResp when is_list(FunResp) ->
+            FunResp;
+        {FunResp} ->
+            [<<"resp">>, {FunResp}];
+        FunResp ->
+            FunResp
+    end,
+    {State, Resp};
+ddoc(State, {_, Fun}, [<<"updates">>|_], Args) ->
+    Resp = case (catch apply(Fun, Args)) of
+        [JsonDoc, JsonResp]  ->
+            [<<"up">>, JsonDoc, JsonResp]
+    end,
+    {State, Resp};
+ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) ->
+    Self = self(),
+    SpawnFun = fun() ->
+        LastChunk = (catch apply(Fun, Args)),
+        case start_list_resp(Self, Sig) of
+            started ->
+                receive
+                    {Self, list_row, _Row} -> ignore;
+                    {Self, list_end} -> ignore
+                after State#evstate.timeout ->
+                    throw({timeout, list_cleanup_pid})
+                end;
+            _ ->
+                ok
+        end,
+        LastChunks =
+        case erlang:get(Sig) of
+            undefined -> [LastChunk];
+            OtherChunks -> [LastChunk | OtherChunks]
+        end,
+        Self ! {self(), list_end, lists:reverse(LastChunks)}
+    end,
+    erlang:put(do_trap, process_flag(trap_exit, true)),
+    Pid = spawn_link(SpawnFun),
+    Resp =
+    receive
+        {Pid, start, Chunks, JsonResp} ->
+            [<<"start">>, Chunks, JsonResp]
+    after State#evstate.timeout ->
+        throw({timeout, list_start})
+    end,
+    {State#evstate{list_pid=Pid}, Resp}.
+
+store_ddoc(DDocs, DDocId, DDoc) ->
+    dict:store(DDocId, DDoc, DDocs).
+load_ddoc(DDocs, DDocId) ->
+    try dict:fetch(DDocId, DDocs) of
+        {DDoc} -> {DDoc}
+    catch
+        _:_Else -> throw({error, ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s",[DDocId]))})
+    end.
+
+bindings(State, Sig) ->
+    bindings(State, Sig, nil).
+bindings(State, Sig, DDoc) ->
+    Self = self(),
+
+    Log = fun(Msg) ->
+        couch_log:info(Msg, [])
+    end,
+
+    Emit = fun(Id, Value) ->
+        Curr = erlang:get(Sig),
+        erlang:put(Sig, [[Id, Value] | Curr])
+    end,
+
+    Start = fun(Headers) ->
+        erlang:put(list_headers, Headers)
+    end,
+
+    Send = fun(Chunk) ->
+        Curr =
+        case erlang:get(Sig) of
+            undefined -> [];
+            Else -> Else
+        end,
+        erlang:put(Sig, [Chunk | Curr])
+    end,
+
+    GetRow = fun() ->
+        case start_list_resp(Self, Sig) of
+            started ->
+                ok;
+            _ ->
+                Chunks =
+                case erlang:get(Sig) of
+                    undefined -> [];
+                    CurrChunks -> CurrChunks
+                end,
+                Self ! {self(), chunks, lists:reverse(Chunks)}
+        end,
+        erlang:put(Sig, []),
+        receive
+            {Self, list_row, Row} -> Row;
+            {Self, list_end} -> nil
+        after State#evstate.timeout ->
+            throw({timeout, list_pid_getrow})
+        end
+    end,
+
+    FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end,
+
+    Bindings = [
+        {'Log', Log},
+        {'Emit', Emit},
+        {'Start', Start},
+        {'Send', Send},
+        {'GetRow', GetRow},
+        {'FoldRows', FoldRows}
+    ],
+    case DDoc of
+        {_Props} ->
+            Bindings ++ [{'DDoc', DDoc}];
+        _Else -> Bindings
+    end.
+
+% thanks to erlview, via:
+% http://erlang.org/pipermail/erlang-questions/2003-November/010544.html
+makefun(State, Source) ->
+    Sig = couch_hash:md5_hash(Source),
+    BindFuns = bindings(State, Sig),
+    {Sig, makefun(State, Source, BindFuns)}.
+makefun(State, Source, {DDoc}) ->
+    Sig = couch_hash:md5_hash(lists:flatten([Source, term_to_binary(DDoc)])),
+    BindFuns = bindings(State, Sig, {DDoc}),
+    {Sig, makefun(State, Source, BindFuns)};
+makefun(_State, Source, BindFuns) when is_list(BindFuns) ->
+    FunStr = binary_to_list(Source),
+    {ok, Tokens, _} = erl_scan:string(FunStr),
+    Form = case (catch erl_parse:parse_exprs(Tokens)) of
+        {ok, [ParsedForm]} ->
+            ParsedForm;
+        {error, {LineNum, _Mod, [Mesg, Params]}}=Error ->
+            couch_log:error("Syntax error on line: ~p~n~s~p~n",
+                            [LineNum, Mesg, Params]),
+            throw(Error)
+    end,
+    Bindings = lists:foldl(fun({Name, Fun}, Acc) ->
+        erl_eval:add_binding(Name, Fun, Acc)
+    end, erl_eval:new_bindings(), BindFuns),
+    {value, Fun, _} = erl_eval:expr(Form, Bindings),
+    Fun.
+
+reduce(State, BinFuns, Keys, Vals, ReReduce) ->
+    Funs = case is_list(BinFuns) of
+        true ->
+            lists:map(fun(BF) -> makefun(State, BF) end, BinFuns);
+        _ ->
+            [makefun(State, BinFuns)]
+    end,
+    Reds = lists:map(fun({_Sig, Fun}) ->
+        Fun(Keys, Vals, ReReduce)
+    end, Funs),
+    [true, Reds].
+
+foldrows(GetRow, ProcRow, Acc) ->
+    case GetRow() of
+        nil ->
+            {ok, Acc};
+        Row ->
+            case (catch ProcRow(Row, Acc)) of
+                {ok, Acc2} ->
+                    foldrows(GetRow, ProcRow, Acc2);
+                {stop, Acc2} ->
+                    {ok, Acc2}
+            end
+    end.
+
+start_list_resp(Self, Sig) ->
+    case erlang:get(list_started) of
+        undefined ->
+            Headers =
+            case erlang:get(list_headers) of
+                undefined -> {[{<<"headers">>, {[]}}]};
+                CurrHdrs -> CurrHdrs
+            end,
+            Chunks =
+            case erlang:get(Sig) of
+                undefined -> [];
+                CurrChunks -> CurrChunks
+            end,
+            Self ! {self(), start, lists:reverse(Chunks), Headers},
+            erlang:put(list_started, true),
+            erlang:put(Sig, []),
+            started;
+        _ ->
+            ok
+    end.
+
+to_binary({Data}) ->
+    Pred = fun({Key, Value}) ->
+        {to_binary(Key), to_binary(Value)}
+    end,
+    {lists:map(Pred, Data)};
+to_binary(Data) when is_list(Data) ->
+    [to_binary(D) || D <- Data];
+to_binary(null) ->
+    null;
+to_binary(true) ->
+    true;
+to_binary(false) ->
+    false;
+to_binary(Data) when is_atom(Data) ->
+    list_to_binary(atom_to_list(Data));
+to_binary(Data) ->
+    Data.
diff --git a/src/couch_js/src/couch_js_os_process.erl b/src/couch_js/src/couch_js_os_process.erl
new file mode 100644
index 0000000..a453d1a
--- /dev/null
+++ b/src/couch_js/src/couch_js_os_process.erl
@@ -0,0 +1,265 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_js_os_process).
+-behaviour(gen_server).
+-vsn(1).
+
+-export([start_link/1, start_link/2, start_link/3, stop/1]).
+-export([set_timeout/2, prompt/2, killer/1]).
+-export([send/2, writeline/2, readline/1, writejson/2, readjson/1]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-define(PORT_OPTIONS, [stream, {line, 4096}, binary, exit_status, hide]).
+
+-record(os_proc,
+    {command,
+     port,
+     writer,
+     reader,
+     timeout=5000,
+     idle
+    }).
+
+start_link(Command) ->
+    start_link(Command, []).
+start_link(Command, Options) ->
+    start_link(Command, Options, ?PORT_OPTIONS).
+start_link(Command, Options, PortOptions) ->
+    gen_server:start_link(?MODULE, [Command, Options, PortOptions], []).
+
+stop(Pid) ->
+    gen_server:cast(Pid, stop).
+
+% Read/Write API
+set_timeout(Pid, TimeOut) when is_integer(TimeOut) ->
+    ok = gen_server:call(Pid, {set_timeout, TimeOut}, infinity).
+
+% Used by couch_event_os_process.erl
+send(Pid, Data) ->
+    gen_server:cast(Pid, {send, Data}).
+
+prompt(Pid, Data) ->
+    case ioq:call(Pid, {prompt, Data}, erlang:get(io_priority)) of
+        {ok, Result} ->
+            Result;
+        Error ->
+            couch_log:error("OS Process Error ~p :: ~p",[Pid,Error]),
+            throw(Error)
+    end.
+
+% Utility functions for reading and writing
+% in custom functions
+writeline(OsProc, Data) when is_record(OsProc, os_proc) ->
+    Res = port_command(OsProc#os_proc.port, [Data, $\n]),
+    couch_js_io_logger:log_output(Data),
+    Res.
+
+readline(#os_proc{} = OsProc) ->
+    Res = readline(OsProc, []),
+    couch_js_io_logger:log_input(Res),
+    Res.
+readline(#os_proc{port = Port} = OsProc, Acc) ->
+    receive
+    {Port, {data, {noeol, Data}}} when is_binary(Acc) ->
+        readline(OsProc, <<Acc/binary,Data/binary>>);
+    {Port, {data, {noeol, Data}}} when is_binary(Data) ->
+        readline(OsProc, Data);
+    {Port, {data, {noeol, Data}}} ->
+        readline(OsProc, [Data|Acc]);
+    {Port, {data, {eol, <<Data/binary>>}}} when is_binary(Acc) ->
+        [<<Acc/binary,Data/binary>>];
+    {Port, {data, {eol, Data}}} when is_binary(Data) ->
+        [Data];
+    {Port, {data, {eol, Data}}} ->
+        lists:reverse(Acc, Data);
+    {Port, Err} ->
+        catch port_close(Port),
+        throw({os_process_error, Err})
+    after OsProc#os_proc.timeout ->
+        catch port_close(Port),
+        throw({os_process_error, "OS process timed out."})
+    end.
+
+% Standard JSON functions
+writejson(OsProc, Data) when is_record(OsProc, os_proc) ->
+    JsonData = ?JSON_ENCODE(Data),
+    couch_log:debug("OS Process ~p Input  :: ~s",
+                    [OsProc#os_proc.port, JsonData]),
+    true = writeline(OsProc, JsonData).
+
+readjson(OsProc) when is_record(OsProc, os_proc) ->
+    Line = iolist_to_binary(readline(OsProc)),
+    couch_log:debug("OS Process ~p Output :: ~s", [OsProc#os_proc.port, Line]),
+    try
+        % Don't actually parse the whole JSON. Just try to see if it's
+        % a command or a doc map/reduce/filter/show/list/update output.
+        % If it's a command then parse the whole JSON and execute the
+        % command, otherwise return the raw JSON line to the caller.
+        pick_command(Line)
+    catch
+    throw:abort ->
+        {json, Line};
+    throw:{cmd, _Cmd} ->
+        case ?JSON_DECODE(Line) of
+        [<<"log">>, Msg] when is_binary(Msg) ->
+            % we got a message to log. Log it and continue
+            couch_log:info("OS Process ~p Log :: ~s",
+                           [OsProc#os_proc.port, Msg]),
+            readjson(OsProc);
+        [<<"error">>, Id, Reason] ->
+            throw({error, {couch_util:to_existing_atom(Id),Reason}});
+        [<<"fatal">>, Id, Reason] ->
+            couch_log:info("OS Process ~p Fatal Error :: ~s ~p",
+                [OsProc#os_proc.port, Id, Reason]),
+            throw({couch_util:to_existing_atom(Id),Reason});
+        _Result ->
+            {json, Line}
+        end
+    end.
+
+pick_command(Line) ->
+    json_stream_parse:events(Line, fun pick_command0/1).
+
+pick_command0(array_start) ->
+    fun pick_command1/1;
+pick_command0(_) ->
+    throw(abort).
+
+pick_command1(<<"log">> = Cmd) ->
+    throw({cmd, Cmd});
+pick_command1(<<"error">> = Cmd) ->
+    throw({cmd, Cmd});
+pick_command1(<<"fatal">> = Cmd) ->
+    throw({cmd, Cmd});
+pick_command1(_) ->
+    throw(abort).
+
+
+% gen_server API
+init([Command, Options, PortOptions]) ->
+    couch_js_io_logger:start(os:getenv("COUCHDB_IO_LOG_DIR")),
+    PrivDir = couch_util:priv_dir(),
+    Spawnkiller = "\"" ++ filename:join(PrivDir, "couchspawnkillable") ++ "\"",
+    V = config:get("query_server_config", "os_process_idle_limit", "300"),
+    IdleLimit = list_to_integer(V) * 1000,
+    BaseProc = #os_proc{
+        command=Command,
+        port=open_port({spawn, Spawnkiller ++ " " ++ Command}, PortOptions),
+        writer=fun ?MODULE:writejson/2,
+        reader=fun ?MODULE:readjson/1,
+        idle=IdleLimit
+    },
+    KillCmd = iolist_to_binary(readline(BaseProc)),
+    Pid = self(),
+    couch_log:debug("OS Process Start :: ~p", [BaseProc#os_proc.port]),
+    spawn(fun() ->
+            % this ensure the real os process is killed when this process dies.
+            erlang:monitor(process, Pid),
+            killer(?b2l(KillCmd))
+        end),
+    OsProc =
+    lists:foldl(fun(Opt, Proc) ->
+        case Opt of
+        {writer, Writer} when is_function(Writer) ->
+            Proc#os_proc{writer=Writer};
+        {reader, Reader} when is_function(Reader) ->
+            Proc#os_proc{reader=Reader};
+        {timeout, TimeOut} when is_integer(TimeOut) ->
+            Proc#os_proc{timeout=TimeOut}
+        end
+    end, BaseProc, Options),
+    {ok, OsProc, IdleLimit}.
+
+terminate(Reason, #os_proc{port=Port}) ->
+    catch port_close(Port),
+    case Reason of
+        normal ->
+            couch_js_io_logger:stop_noerror();
+        Error ->
+            couch_js_io_logger:stop_error(Error)
+    end,
+    ok.
+
+handle_call({set_timeout, TimeOut}, _From, #os_proc{idle=Idle}=OsProc) ->
+    {reply, ok, OsProc#os_proc{timeout=TimeOut}, Idle};
+handle_call({prompt, Data}, _From, #os_proc{idle=Idle}=OsProc) ->
+    #os_proc{writer=Writer, reader=Reader} = OsProc,
+    try
+        Writer(OsProc, Data),
+        {reply, {ok, Reader(OsProc)}, OsProc, Idle}
+    catch
+        throw:{error, OsError} ->
+            {reply, OsError, OsProc, Idle};
+        throw:{fatal, OsError} ->
+            {stop, normal, OsError, OsProc};
+        throw:OtherError ->
+            {stop, normal, OtherError, OsProc}
+    after
+        garbage_collect()
+    end.
+
+handle_cast({send, Data}, #os_proc{writer=Writer, idle=Idle}=OsProc) ->
+    try
+        Writer(OsProc, Data),
+        {noreply, OsProc, Idle}
+    catch
+        throw:OsError ->
+            couch_log:error("Failed sending data: ~p -> ~p", [Data, OsError]),
+            {stop, normal, OsProc}
+    end;
+handle_cast(garbage_collect, #os_proc{idle=Idle}=OsProc) ->
+    erlang:garbage_collect(),
+    {noreply, OsProc, Idle};
+handle_cast(stop, OsProc) ->
+    {stop, normal, OsProc};
+handle_cast(Msg, #os_proc{idle=Idle}=OsProc) ->
+    couch_log:debug("OS Proc: Unknown cast: ~p", [Msg]),
+    {noreply, OsProc, Idle}.
+
+handle_info(timeout, #os_proc{idle=Idle}=OsProc) ->
+    gen_server:cast(couch_js_proc_manager, {os_proc_idle, self()}),
+    erlang:garbage_collect(),
+    {noreply, OsProc, Idle};
+handle_info({Port, {exit_status, 0}}, #os_proc{port=Port}=OsProc) ->
+    couch_log:info("OS Process terminated normally", []),
+    {stop, normal, OsProc};
+handle_info({Port, {exit_status, Status}}, #os_proc{port=Port}=OsProc) ->
+    couch_log:error("OS Process died with status: ~p", [Status]),
+    {stop, {exit_status, Status}, OsProc};
+handle_info(Msg, #os_proc{idle=Idle}=OsProc) ->
+    couch_log:debug("OS Proc: Unknown info: ~p", [Msg]),
+    {noreply, OsProc, Idle}.
+
+code_change(_, {os_proc, Cmd, Port, W, R, Timeout} , _) ->
+    V = config:get("query_server_config","os_process_idle_limit","300"),
+    State = #os_proc{
+        command = Cmd,
+        port = Port,
+        writer = W,
+        reader = R,
+        timeout = Timeout,
+        idle = list_to_integer(V) * 1000
+    },
+    {ok, State};
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+killer(KillCmd) ->
+    receive _ ->
+        os:cmd(KillCmd)
+    after 1000 ->
+        ?MODULE:killer(KillCmd)
+    end.
+
diff --git a/src/couch_js/src/couch_js_proc_manager.erl b/src/couch_js/src/couch_js_proc_manager.erl
new file mode 100644
index 0000000..0964696
--- /dev/null
+++ b/src/couch_js/src/couch_js_proc_manager.erl
@@ -0,0 +1,602 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_js_proc_manager).
+-behaviour(gen_server).
+-behaviour(config_listener).
+-vsn(1).
+
+-export([
+    start_link/0,
+    get_proc_count/0,
+    get_stale_proc_count/0,
+    new_proc/1,
+    reload/0,
+    terminate_stale_procs/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+-export([
+    handle_config_change/5,
+    handle_config_terminate/3
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-define(PROCS, couch_js_proc_manager_procs).
+-define(WAITERS, couch_js_proc_manager_waiters).
+-define(OPENING, couch_js_proc_manager_opening).
+-define(SERVERS, couch_js_proc_manager_servers).
+-define(RELISTEN_DELAY, 5000).
+
+-record(state, {
+    config,
+    counts,
+    threshold_ts,
+    hard_limit,
+    soft_limit
+}).
+
+-type docid() :: iodata().
+-type revision() :: {integer(), binary()}.
+
+-record(client, {
+    timestamp :: os:timestamp() | '_',
+    from :: undefined | {pid(), reference()}  | '_',
+    lang :: binary() | '_',
+    ddoc :: #doc{} | '_',
+    ddoc_key :: undefined | {DDocId :: docid(), Rev :: revision()} | '_'
+}).
+
+-record(proc_int, {
+    pid,
+    lang,
+    client,
+    ddoc_keys = [],
+    prompt_fun,
+    set_timeout_fun,
+    stop_fun,
+    t0 = os:timestamp()
+}).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+get_proc_count() ->
+    gen_server:call(?MODULE, get_proc_count).
+
+
+get_stale_proc_count() ->
+    gen_server:call(?MODULE, get_stale_proc_count).
+
+
+reload() ->
+    gen_server:call(?MODULE, set_threshold_ts).
+
+
+terminate_stale_procs() ->
+    gen_server:call(?MODULE, terminate_stale_procs).
+
+
+init([]) ->
+    process_flag(trap_exit, true),
+    ok = config:listen_for_changes(?MODULE, undefined),
+
+    TableOpts = [public, named_table, ordered_set],
+    ets:new(?PROCS, TableOpts ++ [{keypos, #proc_int.pid}]),
+    ets:new(?WAITERS, TableOpts ++ [{keypos, #client.timestamp}]),
+    ets:new(?OPENING, [public, named_table, set]),
+    ets:new(?SERVERS, [public, named_table, set]),
+    ets:insert(?SERVERS, get_servers_from_env("COUCHDB_QUERY_SERVER_")),
+    ets:insert(?SERVERS, get_servers_from_env("COUCHDB_NATIVE_QUERY_SERVER_")),
+    ets:insert(?SERVERS, [{"QUERY", {mango_native_proc, start_link, []}}]),
+    maybe_configure_erlang_native_servers(),
+
+    {ok, #state{
+        config = get_proc_config(),
+        counts = dict:new(),
+        threshold_ts = os:timestamp(),
+        hard_limit = get_hard_limit(),
+        soft_limit = get_soft_limit()
+    }}.
+
+
+terminate(_Reason, _State) ->
+    ets:foldl(fun(#proc_int{pid=P}, _) ->
+        couch_util:shutdown_sync(P)
+    end, 0, ?PROCS),
+    ok.
+
+
+handle_call(get_proc_count, _From, State) ->
+    NumProcs = ets:info(?PROCS, size),
+    NumOpening = ets:info(?OPENING, size),
+    {reply, NumProcs + NumOpening, State};
+
+handle_call(get_stale_proc_count, _From, State) ->
+    #state{threshold_ts = T0} = State,
+    MatchSpec = [{#proc_int{t0='$1', _='_'}, [{'<', '$1', {T0}}], [true]}],
+    {reply, ets:select_count(?PROCS, MatchSpec), State};
+
+handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
+    LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+    Lang = couch_util:to_binary(LangStr),
+    Client = #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey},
+    add_waiting_client(Client),
+    {noreply, flush_waiters(State, Lang)};
+
+handle_call({get_proc, LangStr}, From, State) ->
+    Lang = couch_util:to_binary(LangStr),
+    Client = #client{from=From, lang=Lang},
+    add_waiting_client(Client),
+    {noreply, flush_waiters(State, Lang)};
+
+handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) ->
+    erlang:demonitor(Ref, [flush]),
+    NewState = case ets:lookup(?PROCS, Proc#proc.pid) of
+        [#proc_int{}=ProcInt] ->
+            return_proc(State, ProcInt);
+        [] ->
+            % Proc must've died and we already
+            % cleared it out of the table in
+            % the handle_info clause.
+            State
+    end,
+    {reply, true, NewState};
+
+handle_call(set_threshold_ts, _From, State) ->
+    FoldFun = fun
+        (#proc_int{client = undefined} = Proc, StateAcc) ->
+            remove_proc(StateAcc, Proc);
+        (_, StateAcc) ->
+            StateAcc
+    end,
+    NewState = ets:foldl(FoldFun, State, ?PROCS),
+    {reply, ok, NewState#state{threshold_ts = os:timestamp()}};
+
+handle_call(terminate_stale_procs, _From, #state{threshold_ts = Ts1} = State) ->
+    FoldFun = fun
+        (#proc_int{client = undefined, t0 = Ts2} = Proc, StateAcc) ->
+            case Ts1 > Ts2 of
+                true ->
+                    remove_proc(StateAcc, Proc);
+                false ->
+                    StateAcc
+            end;
+        (_, StateAcc) ->
+            StateAcc
+    end,
+    NewState = ets:foldl(FoldFun, State, ?PROCS),
+    {reply, ok, NewState};
+
+handle_call(_Call, _From, State) ->
+    {reply, ignored, State}.
+
+
+handle_cast({os_proc_idle, Pid}, #state{counts=Counts}=State) ->
+    NewState = case ets:lookup(?PROCS, Pid) of
+        [#proc_int{client=undefined, lang=Lang}=Proc] ->
+            case dict:find(Lang, Counts) of
+                {ok, Count} when Count >= State#state.soft_limit ->
+                    couch_log:info("Closing idle OS Process: ~p", [Pid]),
+                    remove_proc(State, Proc);
+                {ok, _} ->
+                    State
+            end;
+        _ ->
+            State
+    end,
+    {noreply, NewState};
+
+handle_cast(reload_config, State) ->
+    NewState = State#state{
+        config = get_proc_config(),
+        hard_limit = get_hard_limit(),
+        soft_limit = get_soft_limit()
+    },
+    maybe_configure_erlang_native_servers(),
+    {noreply, flush_waiters(NewState)};
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+
+handle_info(shutdown, State) ->
+    {stop, shutdown, State};
+
+handle_info({'EXIT', Pid, {spawn_ok, Proc0, {ClientPid,_} = From}}, State) ->
+    ets:delete(?OPENING, Pid),
+    link(Proc0#proc_int.pid),
+    Proc = assign_proc(ClientPid, Proc0),
+    gen_server:reply(From, {ok, Proc, State#state.config}),
+    {noreply, State};
+
+handle_info({'EXIT', Pid, spawn_error}, State) ->
+    [{Pid, #client{lang=Lang}}] = ets:lookup(?OPENING, Pid),
+    ets:delete(?OPENING, Pid),
+    NewState = State#state{
+        counts = dict:update_counter(Lang, -1, State#state.counts)
+    },
+    {noreply, flush_waiters(NewState, Lang)};
+
+handle_info({'EXIT', Pid, Reason}, State) ->
+    couch_log:info("~p ~p died ~p", [?MODULE, Pid, Reason]),
+    case ets:lookup(?PROCS, Pid) of
+        [#proc_int{} = Proc] ->
+            NewState = remove_proc(State, Proc),
+            {noreply, flush_waiters(NewState, Proc#proc_int.lang)};
+        [] ->
+            {noreply, State}
+    end;
+
+handle_info({'DOWN', Ref, _, _, _Reason}, State0) ->
+    case ets:match_object(?PROCS, #proc_int{client=Ref, _='_'}) of
+        [#proc_int{} = Proc] ->
+            {noreply, return_proc(State0, Proc)};
+        [] ->
+            {noreply, State0}
+    end;
+
+
+handle_info(restart_config_listener, State) ->
+    ok = config:listen_for_changes(?MODULE, nil),
+    {noreply, State};
+
+handle_info(_Msg, State) ->
+    {noreply, State}.
+
+
+code_change(_OldVsn, #state{}=State, _Extra) ->
+    {ok, State}.
+
+handle_config_terminate(_, stop, _) ->
+    ok;
+handle_config_terminate(_Server, _Reason, _State) ->
+    gen_server:cast(?MODULE, reload_config),
+    erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener).
+
+handle_config_change("native_query_servers", _, _, _, _) ->
+    gen_server:cast(?MODULE, reload_config),
+    {ok, undefined};
+handle_config_change("query_server_config", _, _, _, _) ->
+    gen_server:cast(?MODULE, reload_config),
+    {ok, undefined};
+handle_config_change(_, _, _, _, _) ->
+    {ok, undefined}.
+
+
+find_proc(#client{lang = Lang, ddoc_key = undefined}) ->
+    Pred = fun(_) ->
+        true
+    end,
+    find_proc(Lang, Pred);
+find_proc(#client{lang = Lang, ddoc = DDoc, ddoc_key = DDocKey} = Client) ->
+    Pred = fun(#proc_int{ddoc_keys = DDocKeys}) ->
+        lists:member(DDocKey, DDocKeys)
+    end,
+    case find_proc(Lang, Pred) of
+        not_found ->
+            case find_proc(Client#client{ddoc_key=undefined}) of
+                {ok, Proc} ->
+                    teach_ddoc(DDoc, DDocKey, Proc);
+                Else ->
+                    Else
+            end;
+        Else ->
+            Else
+    end.
+
+find_proc(Lang, Fun) ->
+    try iter_procs(Lang, Fun)
+    catch error:Reason ->
+        StackTrace = erlang:get_stacktrace(),
+        couch_log:error("~p ~p ~p", [?MODULE, Reason, StackTrace]),
+        {error, Reason}
+    end.
+
+
+iter_procs(Lang, Fun) when is_binary(Lang) ->
+    Pattern = #proc_int{lang=Lang, client=undefined, _='_'},
+    MSpec = [{Pattern, [], ['$_']}],
+    case ets:select_reverse(?PROCS, MSpec, 25) of
+        '$end_of_table' ->
+            not_found;
+        Continuation ->
+            iter_procs_int(Continuation, Fun)
+    end.
+
+
+iter_procs_int({[], Continuation0}, Fun) ->
+    case ets:select_reverse(Continuation0) of
+        '$end_of_table' ->
+            not_found;
+        Continuation1 ->
+            iter_procs_int(Continuation1, Fun)
+    end;
+iter_procs_int({[Proc | Rest], Continuation}, Fun) ->
+    case Fun(Proc) of
+        true ->
+            {ok, Proc};
+        false ->
+            iter_procs_int({Rest, Continuation}, Fun)
+    end.
+
+
+spawn_proc(State, Client) ->
+    Pid = spawn_link(?MODULE, new_proc, [Client]),
+    ets:insert(?OPENING, {Pid, Client}),
+    Counts = State#state.counts,
+    Lang = Client#client.lang,
+    State#state{
+        counts = dict:update_counter(Lang, 1, Counts)
+    }.
+
+
+new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) ->
+    #client{from=From, lang=Lang} = Client,
+    Resp = try
+        case new_proc_int(From, Lang) of
+            {ok, Proc} ->
+                {spawn_ok, Proc, From};
+            Error ->
+                gen_server:reply(From, {error, Error}),
+                spawn_error
+        end
+    catch _:_ ->
+        spawn_error
+    end,
+    exit(Resp);
+
+new_proc(Client) ->
+    #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client,
+    Resp = try
+        case new_proc_int(From, Lang) of
+        {ok, NewProc} ->
+            {ok, Proc} = teach_ddoc(DDoc, DDocKey, NewProc),
+            {spawn_ok, Proc, From};
+        Error ->
+            gen_server:reply(From, {error, Error}),
+            spawn_error
+        end
+    catch _:_ ->
+        spawn_error
+    end,
+    exit(Resp).
+
+split_string_if_longer(String, Pos) ->
+    case length(String) > Pos of
+        true -> lists:split(Pos, String);
+        false -> false
+    end.
+
+split_by_char(String, Char) ->
+    %% 17.5 doesn't have string:split
+    %% the function doesn't handle errors
+    %% it is designed to be used only in specific context
+    Pos = string:chr(String, Char),
+    {Key, [_Eq | Value]} = lists:split(Pos - 1, String),
+    {Key, Value}.
+
+get_servers_from_env(Spec) ->
+    SpecLen = length(Spec),
+    % loop over os:getenv(), match SPEC_
+    lists:filtermap(fun(EnvStr) ->
+        case split_string_if_longer(EnvStr, SpecLen) of
+            {Spec, Rest} ->
+                {true, split_by_char(Rest, $=)};
+            _ ->
+                false
+        end
+    end, os:getenv()).
+
+get_query_server(LangStr) ->
+    case ets:lookup(?SERVERS, string:to_upper(LangStr)) of
+        [{_, Command}] -> Command;
+        _ -> undefined
+    end.
+
+native_query_server_enabled() ->
+    % 1. [native_query_server] enable_erlang_query_server = true | false
+    % 2. if [native_query_server] erlang == {couch_native_process, start_link, []} -> pretend true as well
+    NativeEnabled = config:get_boolean("native_query_servers", "enable_erlang_query_server", false),
+    NativeLegacyConfig = config:get("native_query_servers", "erlang", ""),
+    NativeLegacyEnabled = NativeLegacyConfig =:= "{couch_native_process, start_link, []}",
+    NativeEnabled orelse NativeLegacyEnabled.
+
+maybe_configure_erlang_native_servers() ->
+    case native_query_server_enabled() of
+        true ->
+           ets:insert(?SERVERS, [
+               {"ERLANG", {couch_js_native_process, start_link, []}}]);
+        _Else ->
+           ok
+    end.
+
+new_proc_int(From, Lang) when is_binary(Lang) ->
+    LangStr = binary_to_list(Lang),
+    case get_query_server(LangStr) of
+    undefined ->
+        gen_server:reply(From, {unknown_query_language, Lang});
+    {M, F, A} ->
+        {ok, Pid} = apply(M, F, A),
+        make_proc(Pid, Lang, M);
+    Command ->
+        {ok, Pid} = couch_js_os_process:start_link(Command),
+        make_proc(Pid, Lang, couch_js_os_process)
+    end.
+
+
+teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc_int{ddoc_keys=Keys}=Proc) ->
+    % send ddoc over the wire
+    % we only share the rev with the client we know to update code
+    % but it only keeps the latest copy, per each ddoc, around.
+    true = couch_js_query_servers:proc_prompt(
+        export_proc(Proc),
+        [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]),
+    % we should remove any other ddocs keys for this docid
+    % because the query server overwrites without the rev
+    Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
+    % add ddoc to the proc
+    {ok, Proc#proc_int{ddoc_keys=[DDocKey|Keys2]}}.
+
+
+make_proc(Pid, Lang, Mod) when is_binary(Lang) ->
+    Proc = #proc_int{
+        lang = Lang,
+        pid = Pid,
+        prompt_fun = {Mod, prompt},
+        set_timeout_fun = {Mod, set_timeout},
+        stop_fun = {Mod, stop}
+    },
+    unlink(Pid),
+    {ok, Proc}.
+
+
+assign_proc(Pid, #proc_int{client=undefined}=Proc0) when is_pid(Pid) ->
+    Proc = Proc0#proc_int{client = erlang:monitor(process, Pid)},
+    ets:insert(?PROCS, Proc),
+    export_proc(Proc);
+assign_proc(#client{}=Client, #proc_int{client=undefined}=Proc) ->
+    {Pid, _} = Client#client.from,
+    assign_proc(Pid, Proc).
+
+
+return_proc(#state{} = State, #proc_int{} = ProcInt) ->
+    #proc_int{pid = Pid, lang = Lang} = ProcInt,
+    NewState = case is_process_alive(Pid) of true ->
+        case ProcInt#proc_int.t0 < State#state.threshold_ts of
+            true ->
+                remove_proc(State, ProcInt);
+            false ->
+                gen_server:cast(Pid, garbage_collect),
+                true = ets:update_element(?PROCS, Pid, [
+                    {#proc_int.client, undefined}
+                ]),
+                State
+        end;
+    false ->
+        remove_proc(State, ProcInt)
+    end,
+    flush_waiters(NewState, Lang).
+
+
+remove_proc(State, #proc_int{}=Proc) ->
+    ets:delete(?PROCS, Proc#proc_int.pid),
+    case is_process_alive(Proc#proc_int.pid) of true ->
+        unlink(Proc#proc_int.pid),
+        gen_server:cast(Proc#proc_int.pid, stop);
+    false ->
+        ok
+    end,
+    Counts = State#state.counts,
+    Lang = Proc#proc_int.lang,
+    State#state{
+        counts = dict:update_counter(Lang, -1, Counts)
+    }.
+
+
+-spec export_proc(#proc_int{}) -> #proc{}.
+export_proc(#proc_int{} = ProcInt) ->
+    ProcIntList = tuple_to_list(ProcInt),
+    ProcLen = record_info(size, proc),
+    [_ | Data] = lists:sublist(ProcIntList, ProcLen),
+    list_to_tuple([proc | Data]).
+
+
+flush_waiters(State) ->
+    dict:fold(fun(Lang, Count, StateAcc) ->
+        case Count < State#state.hard_limit of
+            true ->
+                flush_waiters(StateAcc, Lang);
+            false ->
+                StateAcc
+        end
+    end, State, State#state.counts).
+
+
+flush_waiters(State, Lang) ->
+    CanSpawn = can_spawn(State, Lang),
+    case get_waiting_client(Lang) of
+        #client{from = From} = Client ->
+            case find_proc(Client) of
+                {ok, ProcInt} ->
+                    Proc = assign_proc(Client, ProcInt),
+                    gen_server:reply(From, {ok, Proc, State#state.config}),
+                    remove_waiting_client(Client),
+                    flush_waiters(State, Lang);
+                {error, Error} ->
+                    gen_server:reply(From, {error, Error}),
+                    remove_waiting_client(Client),
+                    flush_waiters(State, Lang);
+                not_found when CanSpawn ->
+                    NewState = spawn_proc(State, Client),
+                    remove_waiting_client(Client),
+                    flush_waiters(NewState, Lang);
+                not_found ->
+                    State
+            end;
+        undefined ->
+            State
+    end.
+
+
+add_waiting_client(Client) ->
+    ets:insert(?WAITERS, Client#client{timestamp=os:timestamp()}).
+
+-spec get_waiting_client(Lang :: binary()) -> undefined | #client{}.
+get_waiting_client(Lang) ->
+    case ets:match_object(?WAITERS, #client{lang=Lang, _='_'}, 1) of
+        '$end_of_table' ->
+            undefined;
+        {[#client{}=Client], _} ->
+            Client
+    end.
+
+
+remove_waiting_client(#client{timestamp = Timestamp}) ->
+    ets:delete(?WAITERS, Timestamp).
+
+
+can_spawn(#state{hard_limit = HardLimit, counts = Counts}, Lang) ->
+    case dict:find(Lang, Counts) of
+        {ok, Count} -> Count < HardLimit;
+        error -> true
+    end.
+
+
+get_proc_config() ->
+    Limit = config:get("query_server_config", "reduce_limit", "true"),
+    Timeout = config:get("couchdb", "os_process_timeout", "5000"),
+    {[
+        {<<"reduce_limit">>, list_to_atom(Limit)},
+        {<<"timeout">>, list_to_integer(Timeout)}
+    ]}.
+
+
+get_hard_limit() ->
+    LimStr = config:get("query_server_config", "os_process_limit", "100"),
+    list_to_integer(LimStr).
+
+
+get_soft_limit() ->
+    LimStr = config:get("query_server_config", "os_process_soft_limit", "100"),
+    list_to_integer(LimStr).
diff --git a/src/couch_js/src/couch_js_query_servers.erl b/src/couch_js/src/couch_js_query_servers.erl
new file mode 100644
index 0000000..12dc864
--- /dev/null
+++ b/src/couch_js/src/couch_js_query_servers.erl
@@ -0,0 +1,683 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_js_query_servers).
+
+-export([try_compile/4]).
+-export([start_doc_map/3, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]).
+-export([reduce/3, rereduce/3,validate_doc_update/5]).
+-export([filter_docs/5]).
+-export([filter_view/3]).
+-export([finalize/2]).
+-export([rewrite/3]).
+
+-export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]).
+
+% For 210-os-proc-pool.t
+-export([get_os_process/1, get_ddoc_process/2, ret_os_process/1]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-define(SUMERROR, <<"The _sum function requires that map values be numbers, "
+    "arrays of numbers, or objects. Objects cannot be mixed with other "
+    "data structures. Objects can be arbitrarily nested, provided that the values "
+    "for all fields are themselves numbers, arrays of numbers, or objects.">>).
+
+-define(STATERROR, <<"The _stats function requires that map values be numbers "
+    "or arrays of numbers, not '~p'">>).
+
+
+try_compile(Proc, FunctionType, FunctionName, FunctionSource) ->
+    try
+        proc_prompt(Proc, [<<"add_fun">>, FunctionSource]),
+        ok
+    catch
+        {compilation_error, E} ->
+            Fmt = "Compilation of the ~s function in the '~s' view failed: ~s",
+            Msg = io_lib:format(Fmt, [FunctionType, FunctionName, E]),
+            throw({compilation_error, Msg});
+        {os_process_error, {exit_status, ExitStatus}} ->
+            Fmt = "Compilation of the ~s function in the '~s' view failed with exit status: ~p",
+            Msg = io_lib:format(Fmt, [FunctionType, FunctionName, ExitStatus]),
+            throw({compilation_error, Msg})
+    end.
+
+start_doc_map(Lang, Functions, Lib) ->
+    Proc = get_os_process(Lang),
+    case Lib of
+    {[]} -> ok;
+    Lib ->
+        true = proc_prompt(Proc, [<<"add_lib">>, Lib])
+    end,
+    lists:foreach(fun(FunctionSource) ->
+        true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource])
+    end, Functions),
+    {ok, Proc}.
+
+map_doc_raw(Proc, Doc) ->
+    Json = couch_doc:to_json_obj(Doc, []),
+    {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}.
+
+
+stop_doc_map(nil) ->
+    ok;
+stop_doc_map(Proc) ->
+    ok = ret_os_process(Proc).
+
+group_reductions_results([]) ->
+    [];
+group_reductions_results(List) ->
+    {Heads, Tails} = lists:foldl(
+        fun([H|T], {HAcc,TAcc}) ->
+            {[H|HAcc], [T|TAcc]}
+        end, {[], []}, List),
+    case Tails of
+    [[]|_] -> % no tails left
+        [Heads];
+    _ ->
+     [Heads | group_reductions_results(Tails)]
+    end.
+
+finalize(<<"_approx_count_distinct",_/binary>>, Reduction) ->
+    true = hyper:is_hyper(Reduction),
+    {ok, round(hyper:card(Reduction))};
+finalize(<<"_stats",_/binary>>, Unpacked) ->
+    {ok, pack_stats(Unpacked)};
+finalize(_RedSrc, Reduction) ->
+    {ok, Reduction}.
+
+rereduce(_Lang, [], _ReducedValues) ->
+    {ok, []};
+rereduce(Lang, RedSrcs, ReducedValues) ->
+    Grouped = group_reductions_results(ReducedValues),
+    Results = lists:zipwith(
+        fun
+        (<<"_", _/binary>> = FunSrc, Values) ->
+            {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [[[], V] || V <- Values], []),
+            Result;
+        (FunSrc, Values) ->
+            os_rereduce(Lang, [FunSrc], Values)
+        end, RedSrcs, Grouped),
+    {ok, Results}.
+
+reduce(_Lang, [], _KVs) ->
+    {ok, []};
+reduce(Lang, RedSrcs, KVs) ->
+    {OsRedSrcs, BuiltinReds} = lists:partition(fun
+        (<<"_", _/binary>>) -> false;
+        (_OsFun) -> true
+    end, RedSrcs),
+    {ok, OsResults} = os_reduce(Lang, OsRedSrcs, KVs),
+    {ok, BuiltinResults} = builtin_reduce(reduce, BuiltinReds, KVs, []),
+    recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, []).
+
+
+recombine_reduce_results([], [], [], Acc) ->
+    {ok, lists:reverse(Acc)};
+recombine_reduce_results([<<"_", _/binary>>|RedSrcs], OsResults, [BRes|BuiltinResults], Acc) ->
+    recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [BRes|Acc]);
+recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) ->
+    recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [OsR|Acc]).
+
+os_reduce(_Lang, [], _KVs) ->
+    {ok, []};
+os_reduce(Lang, OsRedSrcs, KVs) ->
+    Proc = get_os_process(Lang),
+    OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of
+        [true, Reductions] -> Reductions
+    catch
+        throw:{reduce_overflow_error, Msg} ->
+            [{[{reduce_overflow_error, Msg}]} || _ <- OsRedSrcs]
+    after
+        ok = ret_os_process(Proc)
+    end,
+    {ok, OsResults}.
+
+os_rereduce(Lang, OsRedSrcs, KVs) ->
+    case get_overflow_error(KVs) of
+        undefined ->
+            Proc = get_os_process(Lang),
+            try proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]) of
+                [true, [Reduction]] -> Reduction
+            catch
+                throw:{reduce_overflow_error, Msg} ->
+                    {[{reduce_overflow_error, Msg}]}
+            after
+                ok = ret_os_process(Proc)
+            end;
+        Error ->
+            Error
+    end.
+
+
+get_overflow_error([]) ->
+    undefined;
+get_overflow_error([{[{reduce_overflow_error, _}]} = Error | _]) ->
+    Error;
+get_overflow_error([_ | Rest]) ->
+    get_overflow_error(Rest).
+
+
+builtin_reduce(_Re, [], _KVs, Acc) ->
+    {ok, lists:reverse(Acc)};
+builtin_reduce(Re, [<<"_sum",_/binary>>|BuiltinReds], KVs, Acc) ->
+    Sum = builtin_sum_rows(KVs, 0),
+    Red = check_sum_overflow(?term_size(KVs), ?term_size(Sum), Sum),
+    builtin_reduce(Re, BuiltinReds, KVs, [Red|Acc]);
+builtin_reduce(reduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) ->
+    Count = length(KVs),
+    builtin_reduce(reduce, BuiltinReds, KVs, [Count|Acc]);
+builtin_reduce(rereduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) ->
+    Count = builtin_sum_rows(KVs, 0),
+    builtin_reduce(rereduce, BuiltinReds, KVs, [Count|Acc]);
+builtin_reduce(Re, [<<"_stats",_/binary>>|BuiltinReds], KVs, Acc) ->
+    Stats = builtin_stats(Re, KVs),
+    builtin_reduce(Re, BuiltinReds, KVs, [Stats|Acc]);
+builtin_reduce(Re, [<<"_approx_count_distinct",_/binary>>|BuiltinReds], KVs, Acc) ->
+    Distinct = approx_count_distinct(Re, KVs),
+    builtin_reduce(Re, BuiltinReds, KVs, [Distinct|Acc]).
+
+
+builtin_sum_rows([], Acc) ->
+    Acc;
+builtin_sum_rows([[_Key, Value] | RestKVs], Acc) ->
+    try sum_values(Value, Acc) of
+        NewAcc ->
+            builtin_sum_rows(RestKVs, NewAcc)
+    catch
+        throw:{builtin_reduce_error, Obj} ->
+            Obj;
+        throw:{invalid_value, Reason, Cause} ->
+            {[{<<"error">>, <<"builtin_reduce_error">>},
+                {<<"reason">>, Reason}, {<<"caused_by">>, Cause}]}
+    end.
+
+
+sum_values(Value, Acc) when is_number(Value), is_number(Acc) ->
+    Acc + Value;
+sum_values(Value, Acc) when is_list(Value), is_list(Acc) ->
+    sum_arrays(Acc, Value);
+sum_values(Value, Acc) when is_number(Value), is_list(Acc) ->
+    sum_arrays(Acc, [Value]);
+sum_values(Value, Acc) when is_list(Value), is_number(Acc) ->
+    sum_arrays([Acc], Value);
+sum_values({Props}, Acc) ->
+    case lists:keyfind(<<"error">>, 1, Props) of
+        {<<"error">>, <<"builtin_reduce_error">>} ->
+            throw({builtin_reduce_error, {Props}});
+        false ->
+            ok
+    end,
+    case Acc of
+        0 ->
+            {Props};
+        {AccProps} ->
+            {sum_objects(lists:sort(Props), lists:sort(AccProps))}
+    end;
+sum_values(Else, _Acc) ->
+    throw_sum_error(Else).
+
+sum_objects([{K1, V1} | Rest1], [{K1, V2} | Rest2]) ->
+    [{K1, sum_values(V1, V2)} | sum_objects(Rest1, Rest2)];
+sum_objects([{K1, V1} | Rest1], [{K2, V2} | Rest2]) when K1 < K2 ->
+    [{K1, V1} | sum_objects(Rest1, [{K2, V2} | Rest2])];
+sum_objects([{K1, V1} | Rest1], [{K2, V2} | Rest2]) when K1 > K2 ->
+    [{K2, V2} | sum_objects([{K1, V1} | Rest1], Rest2)];
+sum_objects([], Rest) ->
+    Rest;
+sum_objects(Rest, []) ->
+    Rest.
+
+sum_arrays([], []) ->
+    [];
+sum_arrays([_|_]=Xs, []) ->
+    Xs;
+sum_arrays([], [_|_]=Ys) ->
+    Ys;
+sum_arrays([X|Xs], [Y|Ys]) when is_number(X), is_number(Y) ->
+    [X+Y | sum_arrays(Xs,Ys)];
+sum_arrays(Else, _) ->
+    throw_sum_error(Else).
+
+check_sum_overflow(InSize, OutSize, Sum) ->
+    Overflowed = OutSize > 4906 andalso OutSize * 2 > InSize,
+    case config:get("query_server_config", "reduce_limit", "true") of
+        "true" when Overflowed ->
+            Msg = log_sum_overflow(InSize, OutSize),
+            {[
+                {<<"error">>, <<"builtin_reduce_error">>},
+                {<<"reason">>, Msg}
+            ]};
+        "log" when Overflowed ->
+            log_sum_overflow(InSize, OutSize),
+            Sum;
+        _ ->
+            Sum
+    end.
+
+log_sum_overflow(InSize, OutSize) ->
+    Fmt = "Reduce output must shrink more rapidly: "
+            "input size: ~b "
+            "output size: ~b",
+    Msg = iolist_to_binary(io_lib:format(Fmt, [InSize, OutSize])),
+    couch_log:error(Msg, []),
+    Msg.
+
+builtin_stats(_, []) ->
+    {0, 0, 0, 0, 0};
+builtin_stats(_, [[_,First]|Rest]) ->
+    lists:foldl(fun([_Key, Value], Acc) ->
+        stat_values(Value, Acc)
+    end, build_initial_accumulator(First), Rest).
+
+stat_values(Value, Acc) when is_list(Value), is_list(Acc) ->
+    lists:zipwith(fun stat_values/2, Value, Acc);
+stat_values({PreRed}, Acc) when is_list(PreRed) ->
+    stat_values(unpack_stats({PreRed}), Acc);
+stat_values(Value, Acc) when is_number(Value) ->
+    stat_values({Value, 1, Value, Value, Value*Value}, Acc);
+stat_values(Value, Acc) when is_number(Acc) ->
+    stat_values(Value, {Acc, 1, Acc, Acc, Acc*Acc});
+stat_values(Value, Acc) when is_tuple(Value), is_tuple(Acc) ->
+    {Sum0, Cnt0, Min0, Max0, Sqr0} = Value,
+    {Sum1, Cnt1, Min1, Max1, Sqr1} = Acc,
+    {
+      Sum0 + Sum1,
+      Cnt0 + Cnt1,
+      erlang:min(Min0, Min1),
+      erlang:max(Max0, Max1),
+      Sqr0 + Sqr1
+    };
+stat_values(Else, _Acc) ->
+    throw_stat_error(Else).
+
+build_initial_accumulator(L) when is_list(L) ->
+    [build_initial_accumulator(X) || X <- L];
+build_initial_accumulator(X) when is_number(X) ->
+    {X, 1, X, X, X*X};
+build_initial_accumulator({_, _, _, _, _} = AlreadyUnpacked) ->
+    AlreadyUnpacked;
+build_initial_accumulator({Props}) ->
+    unpack_stats({Props});
+build_initial_accumulator(Else) ->
+    Msg = io_lib:format("non-numeric _stats input: ~w", [Else]),
+    throw({invalid_value, iolist_to_binary(Msg)}).
+
+unpack_stats({PreRed}) when is_list(PreRed) ->
+    {
+      get_number(<<"sum">>, PreRed),
+      get_number(<<"count">>, PreRed),
+      get_number(<<"min">>, PreRed),
+      get_number(<<"max">>, PreRed),
+      get_number(<<"sumsqr">>, PreRed)
+    }.
+
+
+pack_stats({Sum, Cnt, Min, Max, Sqr}) ->
+    {[{<<"sum">>,Sum}, {<<"count">>,Cnt}, {<<"min">>,Min}, {<<"max">>,Max}, {<<"sumsqr">>,Sqr}]};
+pack_stats({Packed}) ->
+    % Legacy code path before we had the finalize operation
+    {Packed};
+pack_stats(Stats) when is_list(Stats) ->
+    lists:map(fun pack_stats/1, Stats).
+
+get_number(Key, Props) ->
+    case couch_util:get_value(Key, Props) of
+    X when is_number(X) ->
+        X;
+    undefined when is_binary(Key) ->
+        get_number(binary_to_atom(Key, latin1), Props);
+    undefined ->
+        Msg = io_lib:format("user _stats input missing required field ~s (~p)",
+            [Key, Props]),
+        throw({invalid_value, iolist_to_binary(Msg)});
+    Else ->
+        Msg = io_lib:format("non-numeric _stats input received for ~s: ~w",
+            [Key, Else]),
+        throw({invalid_value, iolist_to_binary(Msg)})
+    end.
+
+% TODO allow customization of precision in the ddoc.
+approx_count_distinct(reduce, KVs) ->
+    lists:foldl(fun([[Key, _Id], _Value], Filter) ->
+        hyper:insert(term_to_binary(Key), Filter)
+    end, hyper:new(11), KVs);
+approx_count_distinct(rereduce, Reds) ->
+    hyper:union([Filter || [_, Filter] <- Reds]).
+
+% use the function stored in ddoc.validate_doc_update to test an update.
+-spec validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) -> ok when
+    DDoc    :: ddoc(),
+    EditDoc :: doc(),
+    DiskDoc :: doc() | nil,
+    Ctx     :: user_ctx(),
+    SecObj  :: sec_obj().
+
+validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) ->
+    JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
+    JsonDiskDoc = json_doc(DiskDoc),
+    Resp = ddoc_prompt(
+        DDoc,
+        [<<"validate_doc_update">>],
+        [JsonEditDoc, JsonDiskDoc, Ctx, SecObj]
+    ),
+    if Resp == 1 -> ok; true ->
+        couch_stats:increment_counter([couchdb, query_server, vdu_rejects], 1)
+    end,
+    case Resp of
+        RespCode when RespCode =:= 1; RespCode =:= ok; RespCode =:= true ->
+            ok;
+        {[{<<"forbidden">>, Message}]} ->
+            throw({forbidden, Message});
+        {[{<<"unauthorized">>, Message}]} ->
+            throw({unauthorized, Message});
+        {[{_, Message}]} ->
+            throw({unknown_error, Message});
+        Message when is_binary(Message) ->
+            throw({unknown_error, Message})
+    end.
+
+
+rewrite(Req, Db, DDoc) ->
+    Fields = [F || F <- chttpd_external:json_req_obj_fields(),
+              F =/= <<"info">>, F =/= <<"form">>,
+              F =/= <<"uuid">>, F =/= <<"id">>],
+    JsonReq = chttpd_external:json_req_obj(Req, Db, null, Fields),
+    case ddoc_prompt(DDoc, [<<"rewrites">>], [JsonReq]) of
+        {[{<<"forbidden">>, Message}]} ->
+            throw({forbidden, Message});
+        {[{<<"unauthorized">>, Message}]} ->
+            throw({unauthorized, Message});
+        [<<"no_dispatch_rule">>] ->
+            undefined;
+        [<<"ok">>, {V}=Rewrite] when is_list(V) ->
+            ok = validate_rewrite_response(Rewrite),
+            Rewrite;
+        [<<"ok">>, _]  ->
+            throw_rewrite_error(<<"bad rewrite">>);
+        V ->
+            couch_log:error("bad rewrite return ~p", [V]),
+            throw({unknown_error, V})
+    end.
+
+validate_rewrite_response({Fields}) when is_list(Fields) ->
+    validate_rewrite_response_fields(Fields).
+
+validate_rewrite_response_fields([{Key, Value} | Rest]) ->
+    validate_rewrite_response_field(Key, Value),
+    validate_rewrite_response_fields(Rest);
+validate_rewrite_response_fields([]) ->
+    ok.
+
+validate_rewrite_response_field(<<"method">>, Method) when is_binary(Method) ->
+    ok;
+validate_rewrite_response_field(<<"method">>, _) ->
+    throw_rewrite_error(<<"bad method">>);
+validate_rewrite_response_field(<<"path">>, Path) when is_binary(Path) ->
+    ok;
+validate_rewrite_response_field(<<"path">>, _) ->
+    throw_rewrite_error(<<"bad path">>);
+validate_rewrite_response_field(<<"body">>, Body) when is_binary(Body) ->
+    ok;
+validate_rewrite_response_field(<<"body">>, _) ->
+    throw_rewrite_error(<<"bad body">>);
+validate_rewrite_response_field(<<"headers">>, {Props}=Headers) when is_list(Props) ->
+    validate_object_fields(Headers);
+validate_rewrite_response_field(<<"headers">>, _) ->
+    throw_rewrite_error(<<"bad headers">>);
+validate_rewrite_response_field(<<"query">>, {Props}=Query) when is_list(Props) ->
+    validate_object_fields(Query);
+validate_rewrite_response_field(<<"query">>, _) ->
+    throw_rewrite_error(<<"bad query">>);
+validate_rewrite_response_field(<<"code">>, Code) when is_integer(Code) andalso Code >= 200 andalso Code < 600 ->
+    ok;
+validate_rewrite_response_field(<<"code">>, _) ->
+    throw_rewrite_error(<<"bad code">>);
+validate_rewrite_response_field(K, V) ->
+    couch_log:debug("unknown rewrite field ~p=~p", [K, V]),
+    ok.
+
+validate_object_fields({Props}) when is_list(Props) ->
+    lists:foreach(fun
+        ({Key, Value}) when is_binary(Key) andalso is_binary(Value) ->
+            ok;
+        ({Key, Value}) ->
+            Reason = io_lib:format(
+                "object key/value must be strings ~p=~p", [Key, Value]),
+            throw_rewrite_error(Reason);
+        (Value) ->
+            throw_rewrite_error(io_lib:format("bad value ~p", [Value]))
+    end, Props).
+
+
+throw_rewrite_error(Reason) when is_list(Reason)->
+    throw_rewrite_error(iolist_to_binary(Reason));
+throw_rewrite_error(Reason) when is_binary(Reason) ->
+    throw({rewrite_error, Reason}).
+
+
+json_doc_options() ->
+    json_doc_options([]).
+
+json_doc_options(Options) ->
+    Limit = config:get_integer("query_server_config", "revs_limit", 20),
+    [{revs, Limit} | Options].
+
+json_doc(Doc) ->
+    json_doc(Doc, json_doc_options()).
+
+json_doc(nil, _) ->
+    null;
+json_doc(Doc, Options) ->
+    couch_doc:to_json_obj(Doc, Options).
+
+filter_view(DDoc, VName, Docs) ->
+    Options = json_doc_options(),
+    JsonDocs = [json_doc(Doc, Options) || Doc <- Docs],
+    [true, Passes] = ddoc_prompt(DDoc, [<<"views">>, VName, <<"map">>], [JsonDocs]),
+    {ok, Passes}.
+
+filter_docs(Req, Db, DDoc, FName, Docs) ->
+    JsonReq = case Req of
+        {json_req, JsonObj} ->
+            JsonObj;
+        #httpd{} = HttpReq ->
+            couch_httpd_external:json_req_obj(HttpReq, Db)
+    end,
+    Options = json_doc_options(),
+    JsonDocs = [json_doc(Doc, Options) || Doc <- Docs],
+    [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName],
+        [JsonDocs, JsonReq]),
+    {ok, Passes}.
+
+ddoc_proc_prompt({Proc, DDocId}, FunPath, Args) ->
+    proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]).
+
+ddoc_prompt(DDoc, FunPath, Args) ->
+    with_ddoc_proc(DDoc, fun({Proc, DDocId}) ->
+        proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args])
+    end).
+
+with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) ->
+    Rev = couch_doc:rev_to_str({Start, DiskRev}),
+    DDocKey = {DDocId, Rev},
+    Proc = get_ddoc_process(DDoc, DDocKey),
+    try Fun({Proc, DDocId})
+    after
+        ok = ret_os_process(Proc)
+    end.
+
+proc_prompt(Proc, Args) ->
+     case proc_prompt_raw(Proc, Args) of
+     {json, Json} ->
+         ?JSON_DECODE(Json);
+     EJson ->
+         EJson
+     end.
+
+proc_prompt_raw(#proc{prompt_fun = {Mod, Func}} = Proc, Args) ->
+    apply(Mod, Func, [Proc#proc.pid, Args]).
+
+raw_to_ejson({json, Json}) ->
+    ?JSON_DECODE(Json);
+raw_to_ejson(EJson) ->
+    EJson.
+
+proc_stop(Proc) ->
+    {Mod, Func} = Proc#proc.stop_fun,
+    apply(Mod, Func, [Proc#proc.pid]).
+
+proc_set_timeout(Proc, Timeout) ->
+    {Mod, Func} = Proc#proc.set_timeout_fun,
+    apply(Mod, Func, [Proc#proc.pid, Timeout]).
+
+get_os_process_timeout() ->
+    list_to_integer(config:get("couchdb", "os_process_timeout", "5000")).
+
+get_ddoc_process(#doc{} = DDoc, DDocKey) ->
+    % remove this case statement
+    case gen_server:call(couch_js_proc_manager, {get_proc, DDoc, DDocKey}, get_os_process_timeout()) of
+    {ok, Proc, {QueryConfig}} ->
+        % process knows the ddoc
+        case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
+        true ->
+            proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
+            Proc;
+        _ ->
+            catch proc_stop(Proc),
+            get_ddoc_process(DDoc, DDocKey)
+        end;
+    Error ->
+        throw(Error)
+    end.
+
+get_os_process(Lang) ->
+    case gen_server:call(couch_js_proc_manager, {get_proc, Lang}, get_os_process_timeout()) of
+    {ok, Proc, {QueryConfig}} ->
+        case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
+        true ->
+            proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
+            Proc;
+        _ ->
+            catch proc_stop(Proc),
+            get_os_process(Lang)
+        end;
+    Error ->
+        throw(Error)
+    end.
+
+ret_os_process(Proc) ->
+    true = gen_server:call(couch_js_proc_manager, {ret_proc, Proc}, infinity),
+    catch unlink(Proc#proc.pid),
+    ok.
+
+throw_sum_error(Else) ->
+    throw({invalid_value, ?SUMERROR, Else}).
+
+throw_stat_error(Else) ->
+    throw({invalid_value, iolist_to_binary(io_lib:format(?STATERROR, [Else]))}).
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+builtin_sum_rows_negative_test() ->
+    A = [{[{<<"a">>, 1}]}, {[{<<"a">>, 2}]}, {[{<<"a">>, 3}]}],
+    E = {[{<<"error">>, <<"builtin_reduce_error">>}]},
+    ?assertEqual(E, builtin_sum_rows([["K", E]], [])),
+    % The below case is where the value is invalid, but no error because
+    % it's only one document.
+    ?assertEqual(A, builtin_sum_rows([["K", A]], [])),
+    {Result} = builtin_sum_rows([["K", A]], [1, 2, 3]),
+    ?assertEqual({<<"error">>, <<"builtin_reduce_error">>},
+        lists:keyfind(<<"error">>, 1, Result)).
+
+sum_values_test() ->
+    ?assertEqual(3, sum_values(1, 2)),
+    ?assertEqual([2,4,6], sum_values(1, [1,4,6])),
+    ?assertEqual([3,5,7], sum_values([3,2,4], [0,3,3])),
+    X = {[{<<"a">>,1}, {<<"b">>,[1,2]}, {<<"c">>, {[{<<"d">>,3}]}},
+            {<<"g">>,1}]},
+    Y = {[{<<"a">>,2}, {<<"b">>,3}, {<<"c">>, {[{<<"e">>, 5}]}},
+            {<<"f">>,1}, {<<"g">>,1}]},
+    Z = {[{<<"a">>,3}, {<<"b">>,[4,2]}, {<<"c">>, {[{<<"d">>,3},{<<"e">>,5}]}},
+            {<<"f">>,1}, {<<"g">>,2}]},
+    ?assertEqual(Z, sum_values(X, Y)),
+    ?assertEqual(Z, sum_values(Y, X)).
+
+sum_values_negative_test() ->
+    % invalid value
+    A = [{[{<<"a">>, 1}]}, {[{<<"a">>, 2}]}, {[{<<"a">>, 3}]}],
+    B = ["error 1", "error 2"],
+    C = [<<"error 3">>, <<"error 4">>],
+    KV = {[{<<"error">>, <<"builtin_reduce_error">>},
+        {<<"reason">>, ?SUMERROR}, {<<"caused_by">>, <<"some cause">>}]},
+    ?assertThrow({invalid_value, _, _}, sum_values(A, [1, 2, 3])),
+    ?assertThrow({invalid_value, _, _}, sum_values(A, 0)),
+    ?assertThrow({invalid_value, _, _}, sum_values(B, [1, 2])),
+    ?assertThrow({invalid_value, _, _}, sum_values(C, [0])),
+    ?assertThrow({builtin_reduce_error, KV}, sum_values(KV, [0])).
+
+stat_values_test() ->
+    ?assertEqual({1, 2, 0, 1, 1}, stat_values(1, 0)),
+    ?assertEqual({11, 2, 1, 10, 101}, stat_values(1, 10)),
+    ?assertEqual([{9, 2, 2, 7, 53},
+                  {14, 2, 3, 11, 130},
+                  {18, 2, 5, 13, 194}
+                 ], stat_values([2,3,5], [7,11,13])).
+
+reduce_stats_test() ->
+    ?assertEqual([
+        {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+    ], test_reduce(<<"_stats">>, [[[null, key], 2]])),
+
+    ?assertEqual([[
+        {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]},
+        {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+    ]], test_reduce(<<"_stats">>, [[[null, key],[1,2]]])),
+
+    ?assertEqual(
+         {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+    , element(2, finalize(<<"_stats">>, {2, 1, 2, 2, 4}))),
+
+    ?assertEqual([
+        {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]},
+        {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+    ], element(2, finalize(<<"_stats">>, [
+        {1, 1, 1, 1, 1},
+        {2, 1, 2, 2, 4}
+    ]))),
+
+    ?assertEqual([
+        {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]},
+        {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+    ], element(2, finalize(<<"_stats">>, [
+        {1, 1, 1, 1, 1},
+        {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+    ]))),
+
+    ?assertEqual([
+        {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]},
+        {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+    ], element(2, finalize(<<"_stats">>, [
+        {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]},
+        {2, 1, 2, 2, 4}
+    ]))),
+    ok.
+
+test_reduce(Reducer, KVs) ->
+    ?assertMatch({ok, _}, reduce(<<"javascript">>, [Reducer], KVs)),
+    {ok, Reduced} = reduce(<<"javascript">>, [Reducer], KVs),
+    {ok, Finalized} = finalize(Reducer, Reduced),
+    Finalized.
+
+-endif.
diff --git a/src/couch_js/src/couch_js_sup.erl b/src/couch_js/src/couch_js_sup.erl
new file mode 100644
index 0000000..e875461
--- /dev/null
+++ b/src/couch_js/src/couch_js_sup.erl
@@ -0,0 +1,45 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_js_sup).
+-behaviour(supervisor).
+
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+    Flags = #{
+        strategy => one_for_one,
+        intensity => 50,
+        period => 3600
+    },
+    Children = [
+        #{
+            id => couch_js_proc_manager,
+            restart => permanent,
+            shutdown => brutal_kill,
+            start => {couch_js_proc_manager, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.