You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2019/09/23 17:22:41 UTC

[couchdb] branch prototype/fdb-layer-couch-eval updated (145895a -> 8d01cdf)

This is an automated email from the ASF dual-hosted git repository.

davisp pushed a change to branch prototype/fdb-layer-couch-eval
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


 discard 145895a  Start couch_js for couch_views tests
 discard 90a69ed  Update couch_views to use couch_eval
 discard 329d94c  Add tests for couch_js application
 discard 1da6ba1  Implement couch_js callbacks for couch_eval
 discard 2c4c167  Initial creation of couch_js application
 discard 3be1900  Add couch_eval abstraction layer
     add 2eb1dee  Implement setting and getting _revs_limit
     add 9c6b5cb  Make get_security and get_revs_limit calls consistent
     add 2051bd6  Check members after db is opened
     add 01a9228  Add revision stemming for interactive docs
     new bc6d9f3  Add couch_eval abstraction layer
     new 2a22467  Initial creation of couch_js application
     new 3e0691f  Implement couch_js callbacks for couch_eval
     new f672644  Add tests for couch_js application
     new 8d01cdf  Update couch_views to use couch_eval

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (145895a)
            \
             N -- N -- N   refs/heads/prototype/fdb-layer-couch-eval (8d01cdf)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/chttpd/src/chttpd_db.erl                       |   4 +-
 .../test/couch_views_trace_index_test.erl          |   2 +-
 src/fabric/src/fabric2_db.erl                      |  72 +++++---
 src/fabric/src/fabric2_fdb.erl                     |  16 +-
 src/fabric/test/fabric2_db_security_tests.erl      |  64 ++++---
 src/fabric/test/fabric2_rev_stemming.erl           | 204 +++++++++++++++++++++
 test/elixir/test/basics_test.exs                   |  17 ++
 7 files changed, 331 insertions(+), 48 deletions(-)
 create mode 100644 src/fabric/test/fabric2_rev_stemming.erl


[couchdb] 03/05: Implement couch_js callbacks for couch_eval

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/fdb-layer-couch-eval
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 3e0691f06a2b29f15cc53595b11235fd1db81097
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Tue Aug 20 13:06:46 2019 -0500

    Implement couch_js callbacks for couch_eval
---
 rel/overlay/etc/default.ini   |  3 +++
 src/couch_js/src/couch_js.erl | 51 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 54 insertions(+)

diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini
index 51d450b..68652ee 100644
--- a/rel/overlay/etc/default.ini
+++ b/rel/overlay/etc/default.ini
@@ -313,6 +313,9 @@ os_process_limit = 100
 ;query_limit = 268435456
 ;partition_query_limit = 268435456
 
+[couch_eval.languages]
+javascript = couch_js
+
 [mango]
 ; Set to true to disable the "index all fields" text index, which can lead
 ; to out of memory issues when users have documents with nested array fields.
diff --git a/src/couch_js/src/couch_js.erl b/src/couch_js/src/couch_js.erl
new file mode 100644
index 0000000..1bc0f19
--- /dev/null
+++ b/src/couch_js/src/couch_js.erl
@@ -0,0 +1,51 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_js).
+-behavior(couch_eval).
+
+
+-export([
+    acquire_map_context/1,
+    release_map_context/1,
+    map_docs/2
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(JS, <<"javascript">>).
+
+
+acquire_map_context(Opts) ->
+    #{
+        map_funs := MapFuns,
+        lib := Lib
+    } = Opts,
+    couch_js_query_servers:start_doc_map(?JS, MapFuns, Lib).
+
+
+release_map_context(Proc) ->
+    couch_js_query_servers:stop_doc_map(Proc).
+
+
+map_docs(Proc, Docs) ->
+    {ok, lists:map(fun(Doc) ->
+        {ok, RawResults} = couch_js_query_servers:map_doc_raw(Proc, Doc),
+        Results = couch_js_query_servers:raw_to_ejson(RawResults),
+        Tupled = lists:map(fun(ViewResult) ->
+            lists:map(fun([K, V]) -> {K, V} end, ViewResult)
+        end, Results),
+        {Doc#doc.id, Tupled}
+    end, Docs)}.


[couchdb] 01/05: Add couch_eval abstraction layer

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/fdb-layer-couch-eval
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit bc6d9f3609b1f11e23315cd260877320b27feeb3
Author: Garren Smith <ga...@gmail.com>
AuthorDate: Wed Aug 14 17:18:56 2019 +0200

    Add couch_eval abstraction layer
---
 rebar.config.script                   |  1 +
 rel/reltool.config                    |  2 +
 src/couch_eval/README.md              |  5 ++
 src/couch_eval/rebar.config           | 14 +++++
 src/couch_eval/src/couch_eval.app.src | 23 +++++++++
 src/couch_eval/src/couch_eval.erl     | 97 +++++++++++++++++++++++++++++++++++
 6 files changed, 142 insertions(+)

diff --git a/rebar.config.script b/rebar.config.script
index c1d519f..ba7b754 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -74,6 +74,7 @@ SubDirs = [
     "src/couch_log",
     "src/chttpd",
     "src/couch",
+    "src/couch_eval",
     "src/couch_event",
     "src/mem3",
     "src/couch_index",
diff --git a/rel/reltool.config b/rel/reltool.config
index 907b241..e2ae71c 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -40,6 +40,7 @@
         couch_plugins,
         couch_replicator,
         couch_stats,
+        couch_eval,
         couch_event,
         couch_peruser,
         couch_views,
@@ -93,6 +94,7 @@
     {app, config, [{incl_cond, include}]},
     {app, couch, [{incl_cond, include}]},
     {app, couch_epi, [{incl_cond, include}]},
+    {app, couch_eval, [{incl_cond, include}]},
     {app, couch_jobs, [{incl_cond, include}]},
     {app, couch_index, [{incl_cond, include}]},
     {app, couch_log, [{incl_cond, include}]},
diff --git a/src/couch_eval/README.md b/src/couch_eval/README.md
new file mode 100644
index 0000000..048a165
--- /dev/null
+++ b/src/couch_eval/README.md
@@ -0,0 +1,5 @@
+couch_eval
+=====
+
+An an initial abstraction layer for evaluating user provided code. So far
+this is only used by `couch_views` to provide map function support. Currently this is implemented in `couch_js` by reusing the existing `couchjs` mechanics.
diff --git a/src/couch_eval/rebar.config b/src/couch_eval/rebar.config
new file mode 100644
index 0000000..362c878
--- /dev/null
+++ b/src/couch_eval/rebar.config
@@ -0,0 +1,14 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{cover_enabled, true}.
+{cover_print_enabled, true}.
diff --git a/src/couch_eval/src/couch_eval.app.src b/src/couch_eval/src/couch_eval.app.src
new file mode 100644
index 0000000..87193d8
--- /dev/null
+++ b/src/couch_eval/src/couch_eval.app.src
@@ -0,0 +1,23 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_eval, [
+    {description, "An OTP application"},
+    {vsn, git},
+    {registered, []},
+    {applications, [
+        kernel,
+        stdlib,
+        couch_log,
+        config
+    ]}
+ ]}.
diff --git a/src/couch_eval/src/couch_eval.erl b/src/couch_eval/src/couch_eval.erl
new file mode 100644
index 0000000..23ca263
--- /dev/null
+++ b/src/couch_eval/src/couch_eval.erl
@@ -0,0 +1,97 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_eval).
+
+
+-export([
+    acquire_map_context/6,
+    release_map_context/1,
+    map_docs/2
+]).
+
+
+-include_lib("couch/include/couch_db.hrl").
+
+
+-type db_name() :: binary().
+-type doc_id() :: binary().
+-type ddoc_id() :: binary().
+-type language() :: binary().
+-type sig() :: binary().
+-type lib() :: any().
+-type map_fun() :: binary().
+-type map_funs() :: [map_fun()].
+-type result() :: {doc_id(), [[{any(), any()}]]}.
+-type api_mod() :: atom().
+-type context() :: {api_mod(), any()}.
+
+-type context_opts() :: #{
+    db_name := db_name(),
+    ddoc_id => ddoc_id(),
+    language => language(),
+    sig => sig(),
+    lib => lib(),
+    map_funs => map_funs(),
+    api_mod => api_mod()
+}.
+
+
+-callback acquire_map_context(context_opts()) -> {ok, any()} | {error, any()}.
+-callback release_map_context(context()) -> ok | {error, any()}.
+-callback map_docs(context(), [doc()]) -> {ok, [result()]} | {error, any()}.
+
+
+-spec acquire_map_context(
+        db_name(),
+        ddoc_id(),
+        language(),
+        sig(),
+        lib(),
+        map_funs()
+    ) -> {ok, context()} | {error, any()}.
+acquire_map_context(DbName, DDocId, Language, Sig, Lib, MapFuns) ->
+    ApiMod = get_api_mod(Language),
+    CtxOpts = #{
+        db_name => DbName,
+        ddoc_id => DDocId,
+        language => Language,
+        sig => Sig,
+        lib => Lib,
+        map_funs => MapFuns
+    },
+    {ok, Ctx} = ApiMod:acquire_map_context(CtxOpts),
+    {ok, {ApiMod, Ctx}}.
+
+
+-spec release_map_context(context()) -> ok | {error, any()}.
+release_map_context({ApiMod, Ctx}) ->
+    ApiMod:release_map_context(Ctx).
+
+
+-spec map_docs(context(), [doc()]) -> {ok, result()} | {error, any()}.
+map_docs({ApiMod, Ctx}, Docs) ->
+    ApiMod:map_docs(Ctx, Docs).
+
+
+get_api_mod(Language) when is_binary(Language) ->
+    try
+        LangStr = binary_to_list(Language),
+        ModStr = config:get("couch_eval.languages", LangStr),
+        if ModStr /= undefined -> ok; true ->
+            erlang:error({unknown_eval_api_language, Language})
+        end,
+        list_to_existing_atom(ModStr)
+    catch error:badarg ->
+        erlang:error({invalid_eval_api_mod, Language})
+    end.


[couchdb] 04/05: Add tests for couch_js application

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/fdb-layer-couch-eval
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit f672644df163f4b7674fc4cfd75f7e8509d47eaa
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Tue Aug 20 14:21:00 2019 -0500

    Add tests for couch_js application
    
    These are ported over from the existing couch Eunit suite and updated to
    be less racey hopefully.
---
 src/couch_js/src/couch_js.app.src                  |   2 +-
 src/couch_js/test/couch_js_proc_manager_tests.erl  | 373 +++++++++++++++++++++
 src/couch_js/test/couch_js_query_servers_tests.erl |  96 ++++++
 3 files changed, 470 insertions(+), 1 deletion(-)

diff --git a/src/couch_js/src/couch_js.app.src b/src/couch_js/src/couch_js.app.src
index 0db37b6..44efd6d 100644
--- a/src/couch_js/src/couch_js.app.src
+++ b/src/couch_js/src/couch_js.app.src
@@ -22,6 +22,6 @@
         stdlib,
         config,
         couch_log,
-        couch
+        ioq
     ]}
  ]}.
diff --git a/src/couch_js/test/couch_js_proc_manager_tests.erl b/src/couch_js/test/couch_js_proc_manager_tests.erl
new file mode 100644
index 0000000..f138dd6
--- /dev/null
+++ b/src/couch_js/test/couch_js_proc_manager_tests.erl
@@ -0,0 +1,373 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_js_proc_manager_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+-define(TDEF(A), {atom_to_list(A), fun A/0}).
+
+-define(NUM_PROCS, 3).
+-define(TIMEOUT, 1000).
+
+-define(TIMEOUT_ERROR(Msg), erlang:error({assertion_failed, [
+        {module, ?MODULE},
+        {line, ?LINE},
+        {reason, Msg}
+    ]})).
+
+
+start() ->
+    ok = application:set_env(config, ini_files, ?CONFIG_CHAIN),
+    {ok, Started} = application:ensure_all_started(couch_js),
+    config:set("native_query_servers", "enable_erlang_query_server", "true", false),
+    config:set("query_server_config", "os_process_limit", "3", false),
+    config:set("query_server_config", "os_process_soft_limit", "2", false),
+    config:set("query_server_config", "os_process_idle_limit", "1", false),
+    ok = config_wait("os_process_idle_limit", "1"),
+    Started.
+
+
+stop(Apps) ->
+    lists:foreach(fun(App) ->
+        ok = application:stop(App)
+    end, lists:reverse(Apps)).
+
+
+couch_js_proc_manager_test_() ->
+    {
+        "couch_js_proc_manger tests",
+        {
+            setup,
+            fun start/0,
+            fun stop/1,
+            [
+                ?TDEF(should_block_new_proc_on_full_pool),
+                ?TDEF(should_free_slot_on_proc_unexpected_exit),
+                ?TDEF(should_reuse_known_proc),
+                ?TDEF(should_process_waiting_queue_as_fifo),
+                ?TDEF(should_reduce_pool_on_idle_os_procs)
+            ]
+        }
+    }.
+
+
+should_block_new_proc_on_full_pool() ->
+    ok = couch_js_proc_manager:reload(),
+
+    Clients = [
+        spawn_client(),
+        spawn_client(),
+        spawn_client()
+    ],
+
+    lists:foreach(fun(Client) ->
+        ?assertEqual(ok, ping_client(Client))
+    end, Clients),
+
+    % Make sure everyone got a different proc
+    Procs = [get_client_proc(Client) || Client <- Clients],
+    ?assertEqual(lists:sort(Procs), lists:usort(Procs)),
+
+    % This client will be stuck waiting for someone
+    % to give up their proc.
+    Client4 = spawn_client(),
+    ?assert(is_client_waiting(Client4)),
+
+    Client1 = hd(Clients),
+    Proc1 = hd(Procs),
+
+    ?assertEqual(ok, stop_client(Client1)),
+    ?assertEqual(ok, ping_client(Client4)),
+
+    Proc4 = get_client_proc(Client4),
+
+    ?assertEqual(Proc1#proc.pid, Proc4#proc.pid),
+    ?assertNotEqual(Proc1#proc.client, Proc4#proc.client),
+
+    lists:map(fun(C) ->
+        ?assertEqual(ok, stop_client(C))
+    end, [Client4 | tl(Clients)]).
+
+
+should_free_slot_on_proc_unexpected_exit() ->
+    ok = couch_js_proc_manager:reload(),
+
+    Clients = [
+        spawn_client(),
+        spawn_client(),
+        spawn_client()
+    ],
+
+    lists:foreach(fun(Client) ->
+        ?assertEqual(ok, ping_client(Client))
+    end, Clients),
+
+    Procs1 = [get_client_proc(Client) || Client <- Clients],
+    ProcClients1 = [Proc#proc.client || Proc <- Procs1],
+    ?assertEqual(lists:sort(Procs1), lists:usort(Procs1)),
+    ?assertEqual(lists:sort(ProcClients1), lists:usort(ProcClients1)),
+
+    Client1 = hd(Clients),
+    Proc1 = hd(Procs1),
+    ?assertEqual(ok, kill_client(Client1)),
+
+    Client4 = spawn_client(),
+    ?assertEqual(ok, ping_client(Client4)),
+    Proc4 = get_client_proc(Client4),
+
+    ?assertEqual(Proc1#proc.pid, Proc4#proc.pid),
+    ?assertNotEqual(Proc1#proc.client, Proc4#proc.client),
+
+    Procs2 = [Proc4 | tl(Procs1)],
+    ProcClients2 = [Proc4#proc.client | tl(ProcClients1)],
+    ?assertEqual(lists:sort(Procs2), lists:usort(Procs2)),
+    ?assertEqual(lists:sort(ProcClients2), lists:usort(ProcClients2)),
+
+    lists:map(fun(C) ->
+        ?assertEqual(ok, stop_client(C))
+    end, [Client4 | tl(Clients)]).
+
+
+should_reuse_known_proc() ->
+    ok = couch_js_proc_manager:reload(),
+
+    Clients = [
+        spawn_client(<<"ddoc1">>),
+        spawn_client(<<"ddoc2">>)
+    ],
+
+    lists:foreach(fun(Client) ->
+        ?assertEqual(ok, ping_client(Client))
+    end, Clients),
+
+    Procs = [get_client_proc(Client) || Client <- Clients],
+    ?assertEqual(lists:sort(Procs), lists:usort(Procs)),
+
+    lists:foreach(fun(Client) ->
+        ?assertEqual(ok, stop_client(Client))
+    end, Clients),
+
+    lists:foreach(fun(Proc) ->
+        ?assert(is_process_alive(Proc#proc.pid))
+    end, Procs),
+
+    Client = spawn_client(<<"ddoc1">>),
+    ?assertEqual(ok, ping_client(Client)),
+
+    OldProc = hd(Procs),
+    NewProc = get_client_proc(Client),
+
+    ?assertEqual(OldProc#proc.pid, NewProc#proc.pid),
+    ?assertNotEqual(OldProc#proc.client, NewProc#proc.client),
+    ?assertEqual(ok, stop_client(Client)).
+
+
+should_process_waiting_queue_as_fifo() ->
+    Clients = [
+        spawn_client(<<"ddoc1">>),
+        spawn_client(<<"ddoc2">>),
+        spawn_client(<<"ddoc3">>),
+        spawn_client(<<"ddoc4">>),
+        spawn_client(<<"ddoc5">>),
+        spawn_client(<<"ddoc6">>)
+    ],
+
+    lists:foldl(fun(Client, Pos) ->
+        case Pos =< ?NUM_PROCS of
+            true ->
+                ?assertEqual(ok, ping_client(Client));
+            false ->
+                ?assert(is_client_waiting(Client))
+        end,
+        Pos + 1
+    end, 1, Clients),
+
+    LastClients = lists:foldl(fun(_Iteration, ClientAcc) ->
+        FirstClient = hd(ClientAcc),
+        FirstProc = get_client_proc(FirstClient),
+        ?assertEqual(ok, stop_client(FirstClient)),
+
+        RestClients = tl(ClientAcc),
+
+        lists:foldl(fun(Client, Pos) ->
+            case Pos =< ?NUM_PROCS of
+                true ->
+                    ?assertEqual(ok, ping_client(Client));
+                false ->
+                    ?assert(is_client_waiting(Client))
+            end,
+            if Pos /= ?NUM_PROCS -> ok; true ->
+                BubbleProc = get_client_proc(Client),
+                ?assertEqual(FirstProc#proc.pid, BubbleProc#proc.pid),
+                ?assertNotEqual(FirstProc#proc.client, BubbleProc#proc.client)
+            end,
+            Pos + 1
+        end, 1, RestClients),
+
+        RestClients
+    end, Clients, lists:seq(1, 3)),
+
+    lists:foreach(fun(Client) ->
+        ?assertEqual(ok, stop_client(Client))
+    end, LastClients).
+
+
+should_reduce_pool_on_idle_os_procs() ->
+    Clients = [
+        spawn_client(<<"ddoc1">>),
+        spawn_client(<<"ddoc2">>),
+        spawn_client(<<"ddoc3">>)
+    ],
+
+    lists:foreach(fun(Client) ->
+        ?assertEqual(ok, ping_client(Client))
+    end, Clients),
+
+    ?assertEqual(3, couch_js_proc_manager:get_proc_count()),
+
+    lists:foreach(fun(Client) ->
+        ?assertEqual(ok, stop_client(Client))
+    end, Clients),
+
+    ?assertEqual(3, couch_js_proc_manager:get_proc_count()),
+
+    timer:sleep(1200),
+
+    ?assertEqual(1, couch_js_proc_manager:get_proc_count()).
+
+
+spawn_client() ->
+    Parent = self(),
+    Ref = make_ref(),
+    {Pid, _} = spawn_monitor(fun() ->
+        Parent ! {self(), initialized},
+        Proc = couch_js_query_servers:get_os_process(<<"erlang">>),
+        loop(Parent, Ref, Proc)
+    end),
+    receive
+        {Pid, initialized} ->
+            ok
+    after ?TIMEOUT ->
+        ?TIMEOUT_ERROR("Error creating client.")
+    end,
+    {Pid, Ref}.
+
+
+spawn_client(DDocId) ->
+    Parent = self(),
+    Ref = make_ref(),
+    {Pid, _} = spawn_monitor(fun() ->
+        DDocKey = {DDocId, <<"1-abcdefgh">>},
+        DDoc = #doc{body={[{<<"language">>, <<"erlang">>}]}},
+        Parent ! {self(), initialized},
+        Proc = couch_js_query_servers:get_ddoc_process(DDoc, DDocKey),
+        loop(Parent, Ref, Proc)
+    end),
+    receive
+        {Pid, initialized} ->
+            ok
+    after ?TIMEOUT ->
+        ?TIMEOUT_ERROR("Error creating ddoc client.")
+    end,
+    {Pid, Ref}.
+
+
+loop(Parent, Ref, Proc) ->
+    receive
+        ping ->
+            Parent ! {pong, Ref},
+            loop(Parent, Ref, Proc);
+        get_proc  ->
+            Parent ! {proc, Ref, Proc},
+            loop(Parent, Ref, Proc);
+        stop ->
+            couch_js_query_servers:ret_os_process(Proc),
+            Parent ! {stop, Ref};
+        die ->
+            Parent ! {die, Ref},
+            exit(some_error)
+    end.
+
+
+ping_client({Pid, Ref}) ->
+    Pid ! ping,
+    receive
+        {pong, Ref} ->
+            ok
+    after ?TIMEOUT ->
+        ?TIMEOUT_ERROR("Timeout pinging client")
+    end.
+
+
+is_client_waiting({Pid, _Ref}) ->
+    {status, Status} = process_info(Pid, status),
+    {current_function, {M, F, A}} = process_info(Pid, current_function),
+    Status == waiting andalso {M, F, A} == {gen, do_call, 4}.
+
+
+get_client_proc({Pid, Ref}) ->
+    Pid ! get_proc,
+    receive
+        {proc, Ref, Proc} -> Proc
+    after ?TIMEOUT ->
+        ?TIMEOUT_ERROR("Timeout getting proc from client")
+    end.
+
+
+stop_client({Pid, Ref}) ->
+    Pid ! stop,
+    receive
+        {stop, Ref} ->
+            ok
+    after ?TIMEOUT ->
+        ?TIMEOUT_ERROR("Timeout stopping client")
+    end,
+    receive
+        {'DOWN', _, _, Pid, _} ->
+            ok
+    after ?TIMEOUT ->
+        ?TIMEOUT_ERROR("Timeout waiting for stopped client 'DOWN'")
+    end.
+
+
+kill_client({Pid, Ref}) ->
+    Pid ! die,
+    receive
+        {die, Ref} ->
+            ok
+    after ?TIMEOUT ->
+        ?TIMEOUT_ERROR("Timeout killing client")
+    end,
+    receive
+        {'DOWN', _, _, Pid, _} ->
+            ok
+    after ?TIMEOUT ->
+        ?TIMEOUT_ERROR("Timeout waiting for killed client 'DOWN'")
+    end.
+
+
+config_wait(Key, Value) ->
+    config_wait(Key, Value, 0).
+
+config_wait(Key, Value, Count) ->
+    case config:get("query_server_config", Key) of
+        Value ->
+            ok;
+        _ when Count > 10 ->
+            ?TIMEOUT_ERROR("Error waiting for config changes.");
+        _ ->
+            timer:sleep(10),
+            config_wait(Key, Value, Count + 1)
+    end.
diff --git a/src/couch_js/test/couch_js_query_servers_tests.erl b/src/couch_js/test/couch_js_query_servers_tests.erl
new file mode 100644
index 0000000..bc4ecc7
--- /dev/null
+++ b/src/couch_js/test/couch_js_query_servers_tests.erl
@@ -0,0 +1,96 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_js_query_servers_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+
+
+setup() ->
+    meck:new([config, couch_log]).
+
+
+teardown(_) ->
+    meck:unload().
+
+
+sum_overflow_test_() ->
+    {
+        "Test overflow detection in the _sum reduce function",
+        {
+            setup,
+            fun setup/0,
+            fun teardown/1,
+            [
+                fun should_return_error_on_overflow/0,
+                fun should_return_object_on_log/0,
+                fun should_return_object_on_false/0
+            ]
+        }
+    }.
+
+
+should_return_error_on_overflow() ->
+    setup_reduce_limit_mock("true"),
+
+    KVs = gen_sum_kvs(),
+    {ok, [Result]} = couch_query_servers:reduce(<<"foo">>, [<<"_sum">>], KVs),
+    ?assertMatch({[{<<"error">>, <<"builtin_reduce_error">>} | _]}, Result),
+
+    check_reduce_limit_mock().
+
+
+should_return_object_on_log() ->
+    setup_reduce_limit_mock("log"),
+
+    KVs = gen_sum_kvs(),
+    {ok, [Result]} = couch_query_servers:reduce(<<"foo">>, [<<"_sum">>], KVs),
+    ?assertMatch({[_ | _]}, Result),
+    Keys = [K || {K, _} <- element(1, Result)],
+    ?assert(not lists:member(<<"error">>, Keys)),
+
+    check_reduce_limit_mock().
+
+
+should_return_object_on_false() ->
+    setup_reduce_limit_mock("false"),
+
+    KVs = gen_sum_kvs(),
+    {ok, [Result]} = couch_query_servers:reduce(<<"foo">>, [<<"_sum">>], KVs),
+    ?assertMatch({[_ | _]}, Result),
+    Keys = [K || {K, _} <- element(1, Result)],
+    ?assert(not lists:member(<<"error">>, Keys)),
+
+    ?assert(meck:called(config, get, '_')),
+    ?assertNot(meck:called(couch_log, error, '_')).
+
+
+gen_sum_kvs() ->
+    lists:map(fun(I) ->
+        Props = lists:map(fun(_) ->
+            K = couch_util:encodeBase64Url(crypto:strong_rand_bytes(16)),
+            {K, 1}
+        end, lists:seq(1, 20)),
+        [I, {Props}]
+    end, lists:seq(1, 10)).
+
+
+setup_reduce_limit_mock(Value) ->
+    ConfigArgs = ["query_server_config", "reduce_limit", "true"],
+    meck:reset([config, couch_log]),
+    meck:expect(config, get, ConfigArgs, Value),
+    meck:expect(couch_log, error, ['_', '_'], ok).
+
+
+check_reduce_limit_mock() ->
+    ?assert(meck:called(config, get, '_')),
+    ?assert(meck:called(couch_log, error, '_')).


[couchdb] 02/05: Initial creation of couch_js application

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/fdb-layer-couch-eval
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 2a2246710392b500201bbb14525ce44526011264
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Tue Aug 20 12:43:14 2019 -0500

    Initial creation of couch_js application
    
    This commit is mostly a copy paste of the existing modules in the
    `couch` application. For now I've left the build of the `couchjs`
    executable in `couch/priv` to avoid having to do the work of moving the
    build config over. I had contemplated just referencing the modules as
    they current exist but decided this would prepare us a bit better for
    when we eventually remove the old modules.
---
 rebar.config.script                          |   1 +
 rel/reltool.config                           |   2 +
 src/couch_js/README.md                       |   6 +
 src/couch_js/src/couch_js.app.src            |  27 ++
 src/couch_js/src/couch_js_app.erl            |  31 ++
 src/couch_js/src/couch_js_io_logger.erl      | 107 +++++
 src/couch_js/src/couch_js_native_process.erl | 452 ++++++++++++++++++
 src/couch_js/src/couch_js_os_process.erl     | 265 +++++++++++
 src/couch_js/src/couch_js_proc_manager.erl   | 602 +++++++++++++++++++++++
 src/couch_js/src/couch_js_query_servers.erl  | 683 +++++++++++++++++++++++++++
 src/couch_js/src/couch_js_sup.erl            |  45 ++
 11 files changed, 2221 insertions(+)

diff --git a/rebar.config.script b/rebar.config.script
index ba7b754..8ef1abc 100644
--- a/rebar.config.script
+++ b/rebar.config.script
@@ -79,6 +79,7 @@ SubDirs = [
     "src/mem3",
     "src/couch_index",
     "src/couch_mrview",
+    "src/couch_js",
     "src/couch_replicator",
     "src/couch_plugins",
     "src/couch_pse_tests",
diff --git a/rel/reltool.config b/rel/reltool.config
index e2ae71c..caeea38 100644
--- a/rel/reltool.config
+++ b/rel/reltool.config
@@ -41,6 +41,7 @@
         couch_replicator,
         couch_stats,
         couch_eval,
+        couch_js,
         couch_event,
         couch_peruser,
         couch_views,
@@ -95,6 +96,7 @@
     {app, couch, [{incl_cond, include}]},
     {app, couch_epi, [{incl_cond, include}]},
     {app, couch_eval, [{incl_cond, include}]},
+    {app, couch_js, [{incl_cond, include}]},
     {app, couch_jobs, [{incl_cond, include}]},
     {app, couch_index, [{incl_cond, include}]},
     {app, couch_log, [{incl_cond, include}]},
diff --git a/src/couch_js/README.md b/src/couch_js/README.md
new file mode 100644
index 0000000..4084b7d
--- /dev/null
+++ b/src/couch_js/README.md
@@ -0,0 +1,6 @@
+couch_js
+===
+
+This application is just an isolation of most of the code required for running couchjs.
+
+For the time being I'm not moving the implementation of couchjs due to the specifics of the build system configuration. Once we go to remove the `couch` application we'll have to revisit that approach.
\ No newline at end of file
diff --git a/src/couch_js/src/couch_js.app.src b/src/couch_js/src/couch_js.app.src
new file mode 100644
index 0000000..0db37b6
--- /dev/null
+++ b/src/couch_js/src/couch_js.app.src
@@ -0,0 +1,27 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+{application, couch_js, [
+    {description, "An OTP application"},
+    {vsn, git},
+    {registered, [
+        couch_js_proc_manager
+    ]},
+    {mod, {couch_js_app, []}},
+    {applications, [
+        kernel,
+        stdlib,
+        config,
+        couch_log,
+        couch
+    ]}
+ ]}.
diff --git a/src/couch_js/src/couch_js_app.erl b/src/couch_js/src/couch_js_app.erl
new file mode 100644
index 0000000..b28f585
--- /dev/null
+++ b/src/couch_js/src/couch_js_app.erl
@@ -0,0 +1,31 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_js_app).
+
+
+-behaviour(application).
+
+
+-export([
+    start/2,
+    stop/1
+]).
+
+
+start(_StartType, _StartArgs) ->
+    couch_js_sup:start_link().
+
+
+stop(_State) ->
+    ok.
\ No newline at end of file
diff --git a/src/couch_js/src/couch_js_io_logger.erl b/src/couch_js/src/couch_js_io_logger.erl
new file mode 100644
index 0000000..5a1695c
--- /dev/null
+++ b/src/couch_js/src/couch_js_io_logger.erl
@@ -0,0 +1,107 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_js_io_logger).
+
+-export([
+    start/1,
+    log_output/1,
+    log_input/1,
+    stop_noerror/0,
+    stop_error/1
+]).
+
+
+start(undefined) ->
+    ok;
+start(Dir) ->
+    case filelib:is_dir(Dir) of
+        true ->
+            Name = log_name(),
+            Path = Dir ++ "/" ++ Name,
+            OPath = Path ++ ".out.log_",
+            IPath = Path ++ ".in.log_",
+            {ok, OFd} = file:open(OPath, [read, write, raw]),
+            {ok, IFd} = file:open(IPath, [read, write, raw]),
+            ok = file:delete(OPath),
+            ok = file:delete(IPath),
+            put(logger_path, Path),
+            put(logger_out_fd, OFd),
+            put(logger_in_fd, IFd),
+            ok;
+        false ->
+            ok
+    end.
+
+
+stop_noerror() ->
+    case get(logger_path) of
+        undefined ->
+            ok;
+        _Path ->
+            close_logs()
+    end.
+
+
+stop_error(Err) ->
+    case get(logger_path) of
+        undefined ->
+            ok;
+        Path ->
+            save_error_logs(Path, Err),
+            close_logs()
+    end.
+
+
+log_output(Data) ->
+    log(get(logger_out_fd), Data).
+
+
+log_input(Data) ->
+    log(get(logger_in_fd), Data).
+
+
+unix_time() ->
+    {Mega, Sec, USec} = os:timestamp(),
+    UnixTs = (Mega * 1000000 + Sec) * 1000000 + USec,
+    integer_to_list(UnixTs).
+
+
+log_name() ->
+    Ts = unix_time(),
+    Pid0 = erlang:pid_to_list(self()),
+    Pid1 = string:strip(Pid0, left, $<),
+    Pid2 = string:strip(Pid1, right, $>),
+    lists:flatten(io_lib:format("~s_~s", [Ts, Pid2])).
+
+
+close_logs() ->
+    file:close(get(logger_out_fd)),
+    file:close(get(logger_in_fd)).
+
+
+save_error_logs(Path, Err) ->
+    Otp = erlang:system_info(otp_release),
+    Msg = io_lib:format("Error: ~p~nNode: ~p~nOTP: ~p~n", [Err, node(), Otp]),
+    file:write_file(Path ++ ".meta", Msg),
+    IFd = get(logger_out_fd),
+    OFd = get(logger_in_fd),
+    file:position(IFd, 0),
+    file:position(OFd, 0),
+    file:copy(IFd, Path ++  ".out.log"),
+    file:copy(OFd, Path ++ ".in.log").
+
+
+log(undefined, _Data) ->
+    ok;
+log(Fd, Data) ->
+    ok = file:write(Fd, [Data, io_lib:nl()]).
diff --git a/src/couch_js/src/couch_js_native_process.erl b/src/couch_js/src/couch_js_native_process.erl
new file mode 100644
index 0000000..d2c4c1e
--- /dev/null
+++ b/src/couch_js/src/couch_js_native_process.erl
@@ -0,0 +1,452 @@
+% Licensed under the Apache License, Version 2.0 (the "License");
+% you may not use this file except in compliance with the License.
+%
+% You may obtain a copy of the License at
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing,
+% software distributed under the License is distributed on an
+% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+% either express or implied.
+%
+% See the License for the specific language governing permissions
+% and limitations under the License.
+%
+% This file drew much inspiration from erlview, which was written by and
+% copyright Michael McDaniel [http://autosys.us], and is also under APL 2.0
+%
+%
+% This module provides the smallest possible native view-server.
+% With this module in-place, you can add the following to your couch INI files:
+%  [native_query_servers]
+%  erlang={couch_native_process, start_link, []}
+%
+% Which will then allow following example map function to be used:
+%
+%  fun({Doc}) ->
+%    % Below, we emit a single record - the _id as key, null as value
+%    DocId = couch_util:get_value(<<"_id">>, Doc, null),
+%    Emit(DocId, null)
+%  end.
+%
+% which should be roughly the same as the javascript:
+%    emit(doc._id, null);
+%
+% This module exposes enough functions such that a native erlang server can
+% act as a fully-fleged view server, but no 'helper' functions specifically
+% for simplifying your erlang view code.  It is expected other third-party
+% extensions will evolve which offer useful layers on top of this view server
+% to help simplify your view code.
+-module(couch_js_native_process).
+-behaviour(gen_server).
+-vsn(1).
+
+-export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,
+         handle_info/2]).
+-export([set_timeout/2, prompt/2]).
+
+-define(STATE, native_proc_state).
+-record(evstate, {
+    ddocs,
+    funs = [],
+    query_config = [],
+    list_pid = nil,
+    timeout = 5000,
+    idle = 5000
+}).
+
+-include_lib("couch/include/couch_db.hrl").
+
+start_link() ->
+    gen_server:start_link(?MODULE, [], []).
+
+% this is a bit messy, see also couch_query_servers handle_info
+% stop(_Pid) ->
+%     ok.
+
+set_timeout(Pid, TimeOut) ->
+    gen_server:call(Pid, {set_timeout, TimeOut}).
+
+prompt(Pid, Data) when is_list(Data) ->
+    gen_server:call(Pid, {prompt, Data}).
+
+% gen_server callbacks
+init([]) ->
+    V = config:get("query_server_config", "os_process_idle_limit", "300"),
+    Idle = list_to_integer(V) * 1000,
+    {ok, #evstate{ddocs=dict:new(), idle=Idle}, Idle}.
+
+handle_call({set_timeout, TimeOut}, _From, State) ->
+    {reply, ok, State#evstate{timeout=TimeOut}, State#evstate.idle};
+
+handle_call({prompt, Data}, _From, State) ->
+    couch_log:debug("Prompt native qs: ~s",[?JSON_ENCODE(Data)]),
+    {NewState, Resp} = try run(State, to_binary(Data)) of
+        {S, R} -> {S, R}
+        catch
+            throw:{error, Why} ->
+                {State, [<<"error">>, Why, Why]}
+        end,
+
+    Idle = State#evstate.idle,
+    case Resp of
+        {error, Reason} ->
+            Msg = io_lib:format("couch native server error: ~p", [Reason]),
+            Error = [<<"error">>, <<"native_query_server">>, list_to_binary(Msg)],
+            {reply, Error, NewState, Idle};
+        [<<"error">> | Rest] ->
+            % Msg = io_lib:format("couch native server error: ~p", [Rest]),
+            % TODO: markh? (jan)
+            {reply, [<<"error">> | Rest], NewState, Idle};
+        [<<"fatal">> | Rest] ->
+            % Msg = io_lib:format("couch native server error: ~p", [Rest]),
+            % TODO: markh? (jan)
+            {stop, fatal, [<<"error">> | Rest], NewState};
+        Resp ->
+            {reply, Resp, NewState, Idle}
+    end.
+
+handle_cast(garbage_collect, State) ->
+    erlang:garbage_collect(),
+    {noreply, State, State#evstate.idle};
+handle_cast(stop, State) ->
+    {stop, normal, State};
+handle_cast(_Msg, State) ->
+    {noreply, State, State#evstate.idle}.
+
+handle_info(timeout, State) ->
+    gen_server:cast(couch_js_proc_manager, {os_proc_idle, self()}),
+    erlang:garbage_collect(),
+    {noreply, State, State#evstate.idle};
+handle_info({'EXIT',_,normal}, State) ->
+    {noreply, State, State#evstate.idle};
+handle_info({'EXIT',_,Reason}, State) ->
+    {stop, Reason, State}.
+terminate(_Reason, _State) -> ok.
+code_change(_OldVersion, State, _Extra) -> {ok, State}.
+
+run(#evstate{list_pid=Pid}=State, [<<"list_row">>, Row]) when is_pid(Pid) ->
+    Pid ! {self(), list_row, Row},
+    receive
+        {Pid, chunks, Data} ->
+            {State, [<<"chunks">>, Data]};
+        {Pid, list_end, Data} ->
+            receive
+                {'EXIT', Pid, normal} -> ok
+            after State#evstate.timeout ->
+                throw({timeout, list_cleanup})
+            end,
+            process_flag(trap_exit, erlang:get(do_trap)),
+            {State#evstate{list_pid=nil}, [<<"end">>, Data]}
+    after State#evstate.timeout ->
+        throw({timeout, list_row})
+    end;
+run(#evstate{list_pid=Pid}=State, [<<"list_end">>]) when is_pid(Pid) ->
+    Pid ! {self(), list_end},
+    Resp =
+    receive
+        {Pid, list_end, Data} ->
+            receive
+                {'EXIT', Pid, normal} -> ok
+            after State#evstate.timeout ->
+                throw({timeout, list_cleanup})
+            end,
+            [<<"end">>, Data]
+    after State#evstate.timeout ->
+        throw({timeout, list_end})
+    end,
+    process_flag(trap_exit, erlang:get(do_trap)),
+    {State#evstate{list_pid=nil}, Resp};
+run(#evstate{list_pid=Pid}=State, _Command) when is_pid(Pid) ->
+    {State, [<<"error">>, list_error, list_error]};
+run(#evstate{ddocs=DDocs}, [<<"reset">>]) ->
+    {#evstate{ddocs=DDocs}, true};
+run(#evstate{ddocs=DDocs, idle=Idle}, [<<"reset">>, QueryConfig]) ->
+    NewState = #evstate{
+        ddocs = DDocs,
+        query_config = QueryConfig,
+        idle = Idle
+    },
+    {NewState, true};
+run(#evstate{funs=Funs}=State, [<<"add_fun">> , BinFunc]) ->
+    FunInfo = makefun(State, BinFunc),
+    {State#evstate{funs=Funs ++ [FunInfo]}, true};
+run(State, [<<"map_doc">> , Doc]) ->
+    Resp = lists:map(fun({Sig, Fun}) ->
+        erlang:put(Sig, []),
+        Fun(Doc),
+        lists:reverse(erlang:get(Sig))
+    end, State#evstate.funs),
+    {State, Resp};
+run(State, [<<"reduce">>, Funs, KVs]) ->
+    {Keys, Vals} =
+    lists:foldl(fun([K, V], {KAcc, VAcc}) ->
+        {[K | KAcc], [V | VAcc]}
+    end, {[], []}, KVs),
+    Keys2 = lists:reverse(Keys),
+    Vals2 = lists:reverse(Vals),
+    {State, catch reduce(State, Funs, Keys2, Vals2, false)};
+run(State, [<<"rereduce">>, Funs, Vals]) ->
+    {State, catch reduce(State, Funs, null, Vals, true)};
+run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, <<"new">>, DDocId, DDoc]) ->
+    DDocs2 = store_ddoc(DDocs, DDocId, DDoc),
+    {State#evstate{ddocs=DDocs2}, true};
+run(#evstate{ddocs=DDocs}=State, [<<"ddoc">>, DDocId | Rest]) ->
+    DDoc = load_ddoc(DDocs, DDocId),
+    ddoc(State, DDoc, Rest);
+run(_, Unknown) ->
+    couch_log:error("Native Process: Unknown command: ~p~n", [Unknown]),
+    throw({error, unknown_command}).
+
+ddoc(State, {DDoc}, [FunPath, Args]) ->
+    % load fun from the FunPath
+    BFun = lists:foldl(fun
+        (Key, {Props}) when is_list(Props) ->
+            couch_util:get_value(Key, Props, nil);
+        (_Key, Fun) when is_binary(Fun) ->
+            Fun;
+        (_Key, nil) ->
+            throw({error, not_found});
+        (_Key, _Fun) ->
+            throw({error, malformed_ddoc})
+        end, {DDoc}, FunPath),
+    ddoc(State, makefun(State, BFun, {DDoc}), FunPath, Args).
+
+ddoc(State, {_, Fun}, [<<"validate_doc_update">>], Args) ->
+    {State, (catch apply(Fun, Args))};
+ddoc(State, {_, Fun}, [<<"rewrites">>], Args) ->
+    {State, (catch apply(Fun, Args))};
+ddoc(State, {_, Fun}, [<<"filters">>|_], [Docs, Req]) ->
+    FilterFunWrapper = fun(Doc) ->
+        case catch Fun(Doc, Req) of
+        true -> true;
+        false -> false;
+        {'EXIT', Error} -> couch_log:error("~p", [Error])
+        end
+    end,
+    Resp = lists:map(FilterFunWrapper, Docs),
+    {State, [true, Resp]};
+ddoc(State, {_, Fun}, [<<"views">>|_], [Docs]) ->
+    MapFunWrapper = fun(Doc) ->
+        case catch Fun(Doc) of
+        undefined -> true;
+        ok -> false;
+        false -> false;
+        [_|_] -> true;
+        {'EXIT', Error} -> couch_log:error("~p", [Error])
+        end
+    end,
+    Resp = lists:map(MapFunWrapper, Docs),
+    {State, [true, Resp]};
+ddoc(State, {_, Fun}, [<<"shows">>|_], Args) ->
+    Resp = case (catch apply(Fun, Args)) of
+        FunResp when is_list(FunResp) ->
+            FunResp;
+        {FunResp} ->
+            [<<"resp">>, {FunResp}];
+        FunResp ->
+            FunResp
+    end,
+    {State, Resp};
+ddoc(State, {_, Fun}, [<<"updates">>|_], Args) ->
+    Resp = case (catch apply(Fun, Args)) of
+        [JsonDoc, JsonResp]  ->
+            [<<"up">>, JsonDoc, JsonResp]
+    end,
+    {State, Resp};
+ddoc(State, {Sig, Fun}, [<<"lists">>|_], Args) ->
+    Self = self(),
+    SpawnFun = fun() ->
+        LastChunk = (catch apply(Fun, Args)),
+        case start_list_resp(Self, Sig) of
+            started ->
+                receive
+                    {Self, list_row, _Row} -> ignore;
+                    {Self, list_end} -> ignore
+                after State#evstate.timeout ->
+                    throw({timeout, list_cleanup_pid})
+                end;
+            _ ->
+                ok
+        end,
+        LastChunks =
+        case erlang:get(Sig) of
+            undefined -> [LastChunk];
+            OtherChunks -> [LastChunk | OtherChunks]
+        end,
+        Self ! {self(), list_end, lists:reverse(LastChunks)}
+    end,
+    erlang:put(do_trap, process_flag(trap_exit, true)),
+    Pid = spawn_link(SpawnFun),
+    Resp =
+    receive
+        {Pid, start, Chunks, JsonResp} ->
+            [<<"start">>, Chunks, JsonResp]
+    after State#evstate.timeout ->
+        throw({timeout, list_start})
+    end,
+    {State#evstate{list_pid=Pid}, Resp}.
+
+store_ddoc(DDocs, DDocId, DDoc) ->
+    dict:store(DDocId, DDoc, DDocs).
+load_ddoc(DDocs, DDocId) ->
+    try dict:fetch(DDocId, DDocs) of
+        {DDoc} -> {DDoc}
+    catch
+        _:_Else -> throw({error, ?l2b(io_lib:format("Native Query Server missing DDoc with Id: ~s",[DDocId]))})
+    end.
+
+bindings(State, Sig) ->
+    bindings(State, Sig, nil).
+bindings(State, Sig, DDoc) ->
+    Self = self(),
+
+    Log = fun(Msg) ->
+        couch_log:info(Msg, [])
+    end,
+
+    Emit = fun(Id, Value) ->
+        Curr = erlang:get(Sig),
+        erlang:put(Sig, [[Id, Value] | Curr])
+    end,
+
+    Start = fun(Headers) ->
+        erlang:put(list_headers, Headers)
+    end,
+
+    Send = fun(Chunk) ->
+        Curr =
+        case erlang:get(Sig) of
+            undefined -> [];
+            Else -> Else
+        end,
+        erlang:put(Sig, [Chunk | Curr])
+    end,
+
+    GetRow = fun() ->
+        case start_list_resp(Self, Sig) of
+            started ->
+                ok;
+            _ ->
+                Chunks =
+                case erlang:get(Sig) of
+                    undefined -> [];
+                    CurrChunks -> CurrChunks
+                end,
+                Self ! {self(), chunks, lists:reverse(Chunks)}
+        end,
+        erlang:put(Sig, []),
+        receive
+            {Self, list_row, Row} -> Row;
+            {Self, list_end} -> nil
+        after State#evstate.timeout ->
+            throw({timeout, list_pid_getrow})
+        end
+    end,
+
+    FoldRows = fun(Fun, Acc) -> foldrows(GetRow, Fun, Acc) end,
+
+    Bindings = [
+        {'Log', Log},
+        {'Emit', Emit},
+        {'Start', Start},
+        {'Send', Send},
+        {'GetRow', GetRow},
+        {'FoldRows', FoldRows}
+    ],
+    case DDoc of
+        {_Props} ->
+            Bindings ++ [{'DDoc', DDoc}];
+        _Else -> Bindings
+    end.
+
+% thanks to erlview, via:
+% http://erlang.org/pipermail/erlang-questions/2003-November/010544.html
+makefun(State, Source) ->
+    Sig = couch_hash:md5_hash(Source),
+    BindFuns = bindings(State, Sig),
+    {Sig, makefun(State, Source, BindFuns)}.
+makefun(State, Source, {DDoc}) ->
+    Sig = couch_hash:md5_hash(lists:flatten([Source, term_to_binary(DDoc)])),
+    BindFuns = bindings(State, Sig, {DDoc}),
+    {Sig, makefun(State, Source, BindFuns)};
+makefun(_State, Source, BindFuns) when is_list(BindFuns) ->
+    FunStr = binary_to_list(Source),
+    {ok, Tokens, _} = erl_scan:string(FunStr),
+    Form = case (catch erl_parse:parse_exprs(Tokens)) of
+        {ok, [ParsedForm]} ->
+            ParsedForm;
+        {error, {LineNum, _Mod, [Mesg, Params]}}=Error ->
+            couch_log:error("Syntax error on line: ~p~n~s~p~n",
+                            [LineNum, Mesg, Params]),
+            throw(Error)
+    end,
+    Bindings = lists:foldl(fun({Name, Fun}, Acc) ->
+        erl_eval:add_binding(Name, Fun, Acc)
+    end, erl_eval:new_bindings(), BindFuns),
+    {value, Fun, _} = erl_eval:expr(Form, Bindings),
+    Fun.
+
+reduce(State, BinFuns, Keys, Vals, ReReduce) ->
+    Funs = case is_list(BinFuns) of
+        true ->
+            lists:map(fun(BF) -> makefun(State, BF) end, BinFuns);
+        _ ->
+            [makefun(State, BinFuns)]
+    end,
+    Reds = lists:map(fun({_Sig, Fun}) ->
+        Fun(Keys, Vals, ReReduce)
+    end, Funs),
+    [true, Reds].
+
+foldrows(GetRow, ProcRow, Acc) ->
+    case GetRow() of
+        nil ->
+            {ok, Acc};
+        Row ->
+            case (catch ProcRow(Row, Acc)) of
+                {ok, Acc2} ->
+                    foldrows(GetRow, ProcRow, Acc2);
+                {stop, Acc2} ->
+                    {ok, Acc2}
+            end
+    end.
+
+start_list_resp(Self, Sig) ->
+    case erlang:get(list_started) of
+        undefined ->
+            Headers =
+            case erlang:get(list_headers) of
+                undefined -> {[{<<"headers">>, {[]}}]};
+                CurrHdrs -> CurrHdrs
+            end,
+            Chunks =
+            case erlang:get(Sig) of
+                undefined -> [];
+                CurrChunks -> CurrChunks
+            end,
+            Self ! {self(), start, lists:reverse(Chunks), Headers},
+            erlang:put(list_started, true),
+            erlang:put(Sig, []),
+            started;
+        _ ->
+            ok
+    end.
+
+to_binary({Data}) ->
+    Pred = fun({Key, Value}) ->
+        {to_binary(Key), to_binary(Value)}
+    end,
+    {lists:map(Pred, Data)};
+to_binary(Data) when is_list(Data) ->
+    [to_binary(D) || D <- Data];
+to_binary(null) ->
+    null;
+to_binary(true) ->
+    true;
+to_binary(false) ->
+    false;
+to_binary(Data) when is_atom(Data) ->
+    list_to_binary(atom_to_list(Data));
+to_binary(Data) ->
+    Data.
diff --git a/src/couch_js/src/couch_js_os_process.erl b/src/couch_js/src/couch_js_os_process.erl
new file mode 100644
index 0000000..a453d1a
--- /dev/null
+++ b/src/couch_js/src/couch_js_os_process.erl
@@ -0,0 +1,265 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_js_os_process).
+-behaviour(gen_server).
+-vsn(1).
+
+-export([start_link/1, start_link/2, start_link/3, stop/1]).
+-export([set_timeout/2, prompt/2, killer/1]).
+-export([send/2, writeline/2, readline/1, writejson/2, readjson/1]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-define(PORT_OPTIONS, [stream, {line, 4096}, binary, exit_status, hide]).
+
+-record(os_proc,
+    {command,
+     port,
+     writer,
+     reader,
+     timeout=5000,
+     idle
+    }).
+
+start_link(Command) ->
+    start_link(Command, []).
+start_link(Command, Options) ->
+    start_link(Command, Options, ?PORT_OPTIONS).
+start_link(Command, Options, PortOptions) ->
+    gen_server:start_link(?MODULE, [Command, Options, PortOptions], []).
+
+stop(Pid) ->
+    gen_server:cast(Pid, stop).
+
+% Read/Write API
+set_timeout(Pid, TimeOut) when is_integer(TimeOut) ->
+    ok = gen_server:call(Pid, {set_timeout, TimeOut}, infinity).
+
+% Used by couch_event_os_process.erl
+send(Pid, Data) ->
+    gen_server:cast(Pid, {send, Data}).
+
+prompt(Pid, Data) ->
+    case ioq:call(Pid, {prompt, Data}, erlang:get(io_priority)) of
+        {ok, Result} ->
+            Result;
+        Error ->
+            couch_log:error("OS Process Error ~p :: ~p",[Pid,Error]),
+            throw(Error)
+    end.
+
+% Utility functions for reading and writing
+% in custom functions
+writeline(OsProc, Data) when is_record(OsProc, os_proc) ->
+    Res = port_command(OsProc#os_proc.port, [Data, $\n]),
+    couch_js_io_logger:log_output(Data),
+    Res.
+
+readline(#os_proc{} = OsProc) ->
+    Res = readline(OsProc, []),
+    couch_js_io_logger:log_input(Res),
+    Res.
+readline(#os_proc{port = Port} = OsProc, Acc) ->
+    receive
+    {Port, {data, {noeol, Data}}} when is_binary(Acc) ->
+        readline(OsProc, <<Acc/binary,Data/binary>>);
+    {Port, {data, {noeol, Data}}} when is_binary(Data) ->
+        readline(OsProc, Data);
+    {Port, {data, {noeol, Data}}} ->
+        readline(OsProc, [Data|Acc]);
+    {Port, {data, {eol, <<Data/binary>>}}} when is_binary(Acc) ->
+        [<<Acc/binary,Data/binary>>];
+    {Port, {data, {eol, Data}}} when is_binary(Data) ->
+        [Data];
+    {Port, {data, {eol, Data}}} ->
+        lists:reverse(Acc, Data);
+    {Port, Err} ->
+        catch port_close(Port),
+        throw({os_process_error, Err})
+    after OsProc#os_proc.timeout ->
+        catch port_close(Port),
+        throw({os_process_error, "OS process timed out."})
+    end.
+
+% Standard JSON functions
+writejson(OsProc, Data) when is_record(OsProc, os_proc) ->
+    JsonData = ?JSON_ENCODE(Data),
+    couch_log:debug("OS Process ~p Input  :: ~s",
+                    [OsProc#os_proc.port, JsonData]),
+    true = writeline(OsProc, JsonData).
+
+readjson(OsProc) when is_record(OsProc, os_proc) ->
+    Line = iolist_to_binary(readline(OsProc)),
+    couch_log:debug("OS Process ~p Output :: ~s", [OsProc#os_proc.port, Line]),
+    try
+        % Don't actually parse the whole JSON. Just try to see if it's
+        % a command or a doc map/reduce/filter/show/list/update output.
+        % If it's a command then parse the whole JSON and execute the
+        % command, otherwise return the raw JSON line to the caller.
+        pick_command(Line)
+    catch
+    throw:abort ->
+        {json, Line};
+    throw:{cmd, _Cmd} ->
+        case ?JSON_DECODE(Line) of
+        [<<"log">>, Msg] when is_binary(Msg) ->
+            % we got a message to log. Log it and continue
+            couch_log:info("OS Process ~p Log :: ~s",
+                           [OsProc#os_proc.port, Msg]),
+            readjson(OsProc);
+        [<<"error">>, Id, Reason] ->
+            throw({error, {couch_util:to_existing_atom(Id),Reason}});
+        [<<"fatal">>, Id, Reason] ->
+            couch_log:info("OS Process ~p Fatal Error :: ~s ~p",
+                [OsProc#os_proc.port, Id, Reason]),
+            throw({couch_util:to_existing_atom(Id),Reason});
+        _Result ->
+            {json, Line}
+        end
+    end.
+
+pick_command(Line) ->
+    json_stream_parse:events(Line, fun pick_command0/1).
+
+pick_command0(array_start) ->
+    fun pick_command1/1;
+pick_command0(_) ->
+    throw(abort).
+
+pick_command1(<<"log">> = Cmd) ->
+    throw({cmd, Cmd});
+pick_command1(<<"error">> = Cmd) ->
+    throw({cmd, Cmd});
+pick_command1(<<"fatal">> = Cmd) ->
+    throw({cmd, Cmd});
+pick_command1(_) ->
+    throw(abort).
+
+
+% gen_server API
+init([Command, Options, PortOptions]) ->
+    couch_js_io_logger:start(os:getenv("COUCHDB_IO_LOG_DIR")),
+    PrivDir = couch_util:priv_dir(),
+    Spawnkiller = "\"" ++ filename:join(PrivDir, "couchspawnkillable") ++ "\"",
+    V = config:get("query_server_config", "os_process_idle_limit", "300"),
+    IdleLimit = list_to_integer(V) * 1000,
+    BaseProc = #os_proc{
+        command=Command,
+        port=open_port({spawn, Spawnkiller ++ " " ++ Command}, PortOptions),
+        writer=fun ?MODULE:writejson/2,
+        reader=fun ?MODULE:readjson/1,
+        idle=IdleLimit
+    },
+    KillCmd = iolist_to_binary(readline(BaseProc)),
+    Pid = self(),
+    couch_log:debug("OS Process Start :: ~p", [BaseProc#os_proc.port]),
+    spawn(fun() ->
+            % this ensure the real os process is killed when this process dies.
+            erlang:monitor(process, Pid),
+            killer(?b2l(KillCmd))
+        end),
+    OsProc =
+    lists:foldl(fun(Opt, Proc) ->
+        case Opt of
+        {writer, Writer} when is_function(Writer) ->
+            Proc#os_proc{writer=Writer};
+        {reader, Reader} when is_function(Reader) ->
+            Proc#os_proc{reader=Reader};
+        {timeout, TimeOut} when is_integer(TimeOut) ->
+            Proc#os_proc{timeout=TimeOut}
+        end
+    end, BaseProc, Options),
+    {ok, OsProc, IdleLimit}.
+
+terminate(Reason, #os_proc{port=Port}) ->
+    catch port_close(Port),
+    case Reason of
+        normal ->
+            couch_js_io_logger:stop_noerror();
+        Error ->
+            couch_js_io_logger:stop_error(Error)
+    end,
+    ok.
+
+handle_call({set_timeout, TimeOut}, _From, #os_proc{idle=Idle}=OsProc) ->
+    {reply, ok, OsProc#os_proc{timeout=TimeOut}, Idle};
+handle_call({prompt, Data}, _From, #os_proc{idle=Idle}=OsProc) ->
+    #os_proc{writer=Writer, reader=Reader} = OsProc,
+    try
+        Writer(OsProc, Data),
+        {reply, {ok, Reader(OsProc)}, OsProc, Idle}
+    catch
+        throw:{error, OsError} ->
+            {reply, OsError, OsProc, Idle};
+        throw:{fatal, OsError} ->
+            {stop, normal, OsError, OsProc};
+        throw:OtherError ->
+            {stop, normal, OtherError, OsProc}
+    after
+        garbage_collect()
+    end.
+
+handle_cast({send, Data}, #os_proc{writer=Writer, idle=Idle}=OsProc) ->
+    try
+        Writer(OsProc, Data),
+        {noreply, OsProc, Idle}
+    catch
+        throw:OsError ->
+            couch_log:error("Failed sending data: ~p -> ~p", [Data, OsError]),
+            {stop, normal, OsProc}
+    end;
+handle_cast(garbage_collect, #os_proc{idle=Idle}=OsProc) ->
+    erlang:garbage_collect(),
+    {noreply, OsProc, Idle};
+handle_cast(stop, OsProc) ->
+    {stop, normal, OsProc};
+handle_cast(Msg, #os_proc{idle=Idle}=OsProc) ->
+    couch_log:debug("OS Proc: Unknown cast: ~p", [Msg]),
+    {noreply, OsProc, Idle}.
+
+handle_info(timeout, #os_proc{idle=Idle}=OsProc) ->
+    gen_server:cast(couch_js_proc_manager, {os_proc_idle, self()}),
+    erlang:garbage_collect(),
+    {noreply, OsProc, Idle};
+handle_info({Port, {exit_status, 0}}, #os_proc{port=Port}=OsProc) ->
+    couch_log:info("OS Process terminated normally", []),
+    {stop, normal, OsProc};
+handle_info({Port, {exit_status, Status}}, #os_proc{port=Port}=OsProc) ->
+    couch_log:error("OS Process died with status: ~p", [Status]),
+    {stop, {exit_status, Status}, OsProc};
+handle_info(Msg, #os_proc{idle=Idle}=OsProc) ->
+    couch_log:debug("OS Proc: Unknown info: ~p", [Msg]),
+    {noreply, OsProc, Idle}.
+
+code_change(_, {os_proc, Cmd, Port, W, R, Timeout} , _) ->
+    V = config:get("query_server_config","os_process_idle_limit","300"),
+    State = #os_proc{
+        command = Cmd,
+        port = Port,
+        writer = W,
+        reader = R,
+        timeout = Timeout,
+        idle = list_to_integer(V) * 1000
+    },
+    {ok, State};
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+killer(KillCmd) ->
+    receive _ ->
+        os:cmd(KillCmd)
+    after 1000 ->
+        ?MODULE:killer(KillCmd)
+    end.
+
diff --git a/src/couch_js/src/couch_js_proc_manager.erl b/src/couch_js/src/couch_js_proc_manager.erl
new file mode 100644
index 0000000..0964696
--- /dev/null
+++ b/src/couch_js/src/couch_js_proc_manager.erl
@@ -0,0 +1,602 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_js_proc_manager).
+-behaviour(gen_server).
+-behaviour(config_listener).
+-vsn(1).
+
+-export([
+    start_link/0,
+    get_proc_count/0,
+    get_stale_proc_count/0,
+    new_proc/1,
+    reload/0,
+    terminate_stale_procs/0
+]).
+
+-export([
+    init/1,
+    terminate/2,
+    handle_call/3,
+    handle_cast/2,
+    handle_info/2,
+    code_change/3
+]).
+
+-export([
+    handle_config_change/5,
+    handle_config_terminate/3
+]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-define(PROCS, couch_js_proc_manager_procs).
+-define(WAITERS, couch_js_proc_manager_waiters).
+-define(OPENING, couch_js_proc_manager_opening).
+-define(SERVERS, couch_js_proc_manager_servers).
+-define(RELISTEN_DELAY, 5000).
+
+-record(state, {
+    config,
+    counts,
+    threshold_ts,
+    hard_limit,
+    soft_limit
+}).
+
+-type docid() :: iodata().
+-type revision() :: {integer(), binary()}.
+
+-record(client, {
+    timestamp :: os:timestamp() | '_',
+    from :: undefined | {pid(), reference()}  | '_',
+    lang :: binary() | '_',
+    ddoc :: #doc{} | '_',
+    ddoc_key :: undefined | {DDocId :: docid(), Rev :: revision()} | '_'
+}).
+
+-record(proc_int, {
+    pid,
+    lang,
+    client,
+    ddoc_keys = [],
+    prompt_fun,
+    set_timeout_fun,
+    stop_fun,
+    t0 = os:timestamp()
+}).
+
+
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+get_proc_count() ->
+    gen_server:call(?MODULE, get_proc_count).
+
+
+get_stale_proc_count() ->
+    gen_server:call(?MODULE, get_stale_proc_count).
+
+
+reload() ->
+    gen_server:call(?MODULE, set_threshold_ts).
+
+
+terminate_stale_procs() ->
+    gen_server:call(?MODULE, terminate_stale_procs).
+
+
+init([]) ->
+    process_flag(trap_exit, true),
+    ok = config:listen_for_changes(?MODULE, undefined),
+
+    TableOpts = [public, named_table, ordered_set],
+    ets:new(?PROCS, TableOpts ++ [{keypos, #proc_int.pid}]),
+    ets:new(?WAITERS, TableOpts ++ [{keypos, #client.timestamp}]),
+    ets:new(?OPENING, [public, named_table, set]),
+    ets:new(?SERVERS, [public, named_table, set]),
+    ets:insert(?SERVERS, get_servers_from_env("COUCHDB_QUERY_SERVER_")),
+    ets:insert(?SERVERS, get_servers_from_env("COUCHDB_NATIVE_QUERY_SERVER_")),
+    ets:insert(?SERVERS, [{"QUERY", {mango_native_proc, start_link, []}}]),
+    maybe_configure_erlang_native_servers(),
+
+    {ok, #state{
+        config = get_proc_config(),
+        counts = dict:new(),
+        threshold_ts = os:timestamp(),
+        hard_limit = get_hard_limit(),
+        soft_limit = get_soft_limit()
+    }}.
+
+
+terminate(_Reason, _State) ->
+    ets:foldl(fun(#proc_int{pid=P}, _) ->
+        couch_util:shutdown_sync(P)
+    end, 0, ?PROCS),
+    ok.
+
+
+handle_call(get_proc_count, _From, State) ->
+    NumProcs = ets:info(?PROCS, size),
+    NumOpening = ets:info(?OPENING, size),
+    {reply, NumProcs + NumOpening, State};
+
+handle_call(get_stale_proc_count, _From, State) ->
+    #state{threshold_ts = T0} = State,
+    MatchSpec = [{#proc_int{t0='$1', _='_'}, [{'<', '$1', {T0}}], [true]}],
+    {reply, ets:select_count(?PROCS, MatchSpec), State};
+
+handle_call({get_proc, #doc{body={Props}}=DDoc, DDocKey}, From, State) ->
+    LangStr = couch_util:get_value(<<"language">>, Props, <<"javascript">>),
+    Lang = couch_util:to_binary(LangStr),
+    Client = #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey},
+    add_waiting_client(Client),
+    {noreply, flush_waiters(State, Lang)};
+
+handle_call({get_proc, LangStr}, From, State) ->
+    Lang = couch_util:to_binary(LangStr),
+    Client = #client{from=From, lang=Lang},
+    add_waiting_client(Client),
+    {noreply, flush_waiters(State, Lang)};
+
+handle_call({ret_proc, #proc{client=Ref} = Proc}, _From, State) ->
+    erlang:demonitor(Ref, [flush]),
+    NewState = case ets:lookup(?PROCS, Proc#proc.pid) of
+        [#proc_int{}=ProcInt] ->
+            return_proc(State, ProcInt);
+        [] ->
+            % Proc must've died and we already
+            % cleared it out of the table in
+            % the handle_info clause.
+            State
+    end,
+    {reply, true, NewState};
+
+handle_call(set_threshold_ts, _From, State) ->
+    FoldFun = fun
+        (#proc_int{client = undefined} = Proc, StateAcc) ->
+            remove_proc(StateAcc, Proc);
+        (_, StateAcc) ->
+            StateAcc
+    end,
+    NewState = ets:foldl(FoldFun, State, ?PROCS),
+    {reply, ok, NewState#state{threshold_ts = os:timestamp()}};
+
+handle_call(terminate_stale_procs, _From, #state{threshold_ts = Ts1} = State) ->
+    FoldFun = fun
+        (#proc_int{client = undefined, t0 = Ts2} = Proc, StateAcc) ->
+            case Ts1 > Ts2 of
+                true ->
+                    remove_proc(StateAcc, Proc);
+                false ->
+                    StateAcc
+            end;
+        (_, StateAcc) ->
+            StateAcc
+    end,
+    NewState = ets:foldl(FoldFun, State, ?PROCS),
+    {reply, ok, NewState};
+
+handle_call(_Call, _From, State) ->
+    {reply, ignored, State}.
+
+
+handle_cast({os_proc_idle, Pid}, #state{counts=Counts}=State) ->
+    NewState = case ets:lookup(?PROCS, Pid) of
+        [#proc_int{client=undefined, lang=Lang}=Proc] ->
+            case dict:find(Lang, Counts) of
+                {ok, Count} when Count >= State#state.soft_limit ->
+                    couch_log:info("Closing idle OS Process: ~p", [Pid]),
+                    remove_proc(State, Proc);
+                {ok, _} ->
+                    State
+            end;
+        _ ->
+            State
+    end,
+    {noreply, NewState};
+
+handle_cast(reload_config, State) ->
+    NewState = State#state{
+        config = get_proc_config(),
+        hard_limit = get_hard_limit(),
+        soft_limit = get_soft_limit()
+    },
+    maybe_configure_erlang_native_servers(),
+    {noreply, flush_waiters(NewState)};
+
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+
+handle_info(shutdown, State) ->
+    {stop, shutdown, State};
+
+handle_info({'EXIT', Pid, {spawn_ok, Proc0, {ClientPid,_} = From}}, State) ->
+    ets:delete(?OPENING, Pid),
+    link(Proc0#proc_int.pid),
+    Proc = assign_proc(ClientPid, Proc0),
+    gen_server:reply(From, {ok, Proc, State#state.config}),
+    {noreply, State};
+
+handle_info({'EXIT', Pid, spawn_error}, State) ->
+    [{Pid, #client{lang=Lang}}] = ets:lookup(?OPENING, Pid),
+    ets:delete(?OPENING, Pid),
+    NewState = State#state{
+        counts = dict:update_counter(Lang, -1, State#state.counts)
+    },
+    {noreply, flush_waiters(NewState, Lang)};
+
+handle_info({'EXIT', Pid, Reason}, State) ->
+    couch_log:info("~p ~p died ~p", [?MODULE, Pid, Reason]),
+    case ets:lookup(?PROCS, Pid) of
+        [#proc_int{} = Proc] ->
+            NewState = remove_proc(State, Proc),
+            {noreply, flush_waiters(NewState, Proc#proc_int.lang)};
+        [] ->
+            {noreply, State}
+    end;
+
+handle_info({'DOWN', Ref, _, _, _Reason}, State0) ->
+    case ets:match_object(?PROCS, #proc_int{client=Ref, _='_'}) of
+        [#proc_int{} = Proc] ->
+            {noreply, return_proc(State0, Proc)};
+        [] ->
+            {noreply, State0}
+    end;
+
+
+handle_info(restart_config_listener, State) ->
+    ok = config:listen_for_changes(?MODULE, nil),
+    {noreply, State};
+
+handle_info(_Msg, State) ->
+    {noreply, State}.
+
+
+code_change(_OldVsn, #state{}=State, _Extra) ->
+    {ok, State}.
+
+handle_config_terminate(_, stop, _) ->
+    ok;
+handle_config_terminate(_Server, _Reason, _State) ->
+    gen_server:cast(?MODULE, reload_config),
+    erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener).
+
+handle_config_change("native_query_servers", _, _, _, _) ->
+    gen_server:cast(?MODULE, reload_config),
+    {ok, undefined};
+handle_config_change("query_server_config", _, _, _, _) ->
+    gen_server:cast(?MODULE, reload_config),
+    {ok, undefined};
+handle_config_change(_, _, _, _, _) ->
+    {ok, undefined}.
+
+
+find_proc(#client{lang = Lang, ddoc_key = undefined}) ->
+    Pred = fun(_) ->
+        true
+    end,
+    find_proc(Lang, Pred);
+find_proc(#client{lang = Lang, ddoc = DDoc, ddoc_key = DDocKey} = Client) ->
+    Pred = fun(#proc_int{ddoc_keys = DDocKeys}) ->
+        lists:member(DDocKey, DDocKeys)
+    end,
+    case find_proc(Lang, Pred) of
+        not_found ->
+            case find_proc(Client#client{ddoc_key=undefined}) of
+                {ok, Proc} ->
+                    teach_ddoc(DDoc, DDocKey, Proc);
+                Else ->
+                    Else
+            end;
+        Else ->
+            Else
+    end.
+
+find_proc(Lang, Fun) ->
+    try iter_procs(Lang, Fun)
+    catch error:Reason ->
+        StackTrace = erlang:get_stacktrace(),
+        couch_log:error("~p ~p ~p", [?MODULE, Reason, StackTrace]),
+        {error, Reason}
+    end.
+
+
+iter_procs(Lang, Fun) when is_binary(Lang) ->
+    Pattern = #proc_int{lang=Lang, client=undefined, _='_'},
+    MSpec = [{Pattern, [], ['$_']}],
+    case ets:select_reverse(?PROCS, MSpec, 25) of
+        '$end_of_table' ->
+            not_found;
+        Continuation ->
+            iter_procs_int(Continuation, Fun)
+    end.
+
+
+iter_procs_int({[], Continuation0}, Fun) ->
+    case ets:select_reverse(Continuation0) of
+        '$end_of_table' ->
+            not_found;
+        Continuation1 ->
+            iter_procs_int(Continuation1, Fun)
+    end;
+iter_procs_int({[Proc | Rest], Continuation}, Fun) ->
+    case Fun(Proc) of
+        true ->
+            {ok, Proc};
+        false ->
+            iter_procs_int({Rest, Continuation}, Fun)
+    end.
+
+
+spawn_proc(State, Client) ->
+    Pid = spawn_link(?MODULE, new_proc, [Client]),
+    ets:insert(?OPENING, {Pid, Client}),
+    Counts = State#state.counts,
+    Lang = Client#client.lang,
+    State#state{
+        counts = dict:update_counter(Lang, 1, Counts)
+    }.
+
+
+new_proc(#client{ddoc=undefined, ddoc_key=undefined}=Client) ->
+    #client{from=From, lang=Lang} = Client,
+    Resp = try
+        case new_proc_int(From, Lang) of
+            {ok, Proc} ->
+                {spawn_ok, Proc, From};
+            Error ->
+                gen_server:reply(From, {error, Error}),
+                spawn_error
+        end
+    catch _:_ ->
+        spawn_error
+    end,
+    exit(Resp);
+
+new_proc(Client) ->
+    #client{from=From, lang=Lang, ddoc=DDoc, ddoc_key=DDocKey} = Client,
+    Resp = try
+        case new_proc_int(From, Lang) of
+        {ok, NewProc} ->
+            {ok, Proc} = teach_ddoc(DDoc, DDocKey, NewProc),
+            {spawn_ok, Proc, From};
+        Error ->
+            gen_server:reply(From, {error, Error}),
+            spawn_error
+        end
+    catch _:_ ->
+        spawn_error
+    end,
+    exit(Resp).
+
+split_string_if_longer(String, Pos) ->
+    case length(String) > Pos of
+        true -> lists:split(Pos, String);
+        false -> false
+    end.
+
+split_by_char(String, Char) ->
+    %% 17.5 doesn't have string:split
+    %% the function doesn't handle errors
+    %% it is designed to be used only in specific context
+    Pos = string:chr(String, Char),
+    {Key, [_Eq | Value]} = lists:split(Pos - 1, String),
+    {Key, Value}.
+
+get_servers_from_env(Spec) ->
+    SpecLen = length(Spec),
+    % loop over os:getenv(), match SPEC_
+    lists:filtermap(fun(EnvStr) ->
+        case split_string_if_longer(EnvStr, SpecLen) of
+            {Spec, Rest} ->
+                {true, split_by_char(Rest, $=)};
+            _ ->
+                false
+        end
+    end, os:getenv()).
+
+get_query_server(LangStr) ->
+    case ets:lookup(?SERVERS, string:to_upper(LangStr)) of
+        [{_, Command}] -> Command;
+        _ -> undefined
+    end.
+
+native_query_server_enabled() ->
+    % 1. [native_query_server] enable_erlang_query_server = true | false
+    % 2. if [native_query_server] erlang == {couch_native_process, start_link, []} -> pretend true as well
+    NativeEnabled = config:get_boolean("native_query_servers", "enable_erlang_query_server", false),
+    NativeLegacyConfig = config:get("native_query_servers", "erlang", ""),
+    NativeLegacyEnabled = NativeLegacyConfig =:= "{couch_native_process, start_link, []}",
+    NativeEnabled orelse NativeLegacyEnabled.
+
+maybe_configure_erlang_native_servers() ->
+    case native_query_server_enabled() of
+        true ->
+           ets:insert(?SERVERS, [
+               {"ERLANG", {couch_js_native_process, start_link, []}}]);
+        _Else ->
+           ok
+    end.
+
+new_proc_int(From, Lang) when is_binary(Lang) ->
+    LangStr = binary_to_list(Lang),
+    case get_query_server(LangStr) of
+    undefined ->
+        gen_server:reply(From, {unknown_query_language, Lang});
+    {M, F, A} ->
+        {ok, Pid} = apply(M, F, A),
+        make_proc(Pid, Lang, M);
+    Command ->
+        {ok, Pid} = couch_js_os_process:start_link(Command),
+        make_proc(Pid, Lang, couch_js_os_process)
+    end.
+
+
+teach_ddoc(DDoc, {DDocId, _Rev}=DDocKey, #proc_int{ddoc_keys=Keys}=Proc) ->
+    % send ddoc over the wire
+    % we only share the rev with the client we know to update code
+    % but it only keeps the latest copy, per each ddoc, around.
+    true = couch_js_query_servers:proc_prompt(
+        export_proc(Proc),
+        [<<"ddoc">>, <<"new">>, DDocId, couch_doc:to_json_obj(DDoc, [])]),
+    % we should remove any other ddocs keys for this docid
+    % because the query server overwrites without the rev
+    Keys2 = [{D,R} || {D,R} <- Keys, D /= DDocId],
+    % add ddoc to the proc
+    {ok, Proc#proc_int{ddoc_keys=[DDocKey|Keys2]}}.
+
+
+make_proc(Pid, Lang, Mod) when is_binary(Lang) ->
+    Proc = #proc_int{
+        lang = Lang,
+        pid = Pid,
+        prompt_fun = {Mod, prompt},
+        set_timeout_fun = {Mod, set_timeout},
+        stop_fun = {Mod, stop}
+    },
+    unlink(Pid),
+    {ok, Proc}.
+
+
+assign_proc(Pid, #proc_int{client=undefined}=Proc0) when is_pid(Pid) ->
+    Proc = Proc0#proc_int{client = erlang:monitor(process, Pid)},
+    ets:insert(?PROCS, Proc),
+    export_proc(Proc);
+assign_proc(#client{}=Client, #proc_int{client=undefined}=Proc) ->
+    {Pid, _} = Client#client.from,
+    assign_proc(Pid, Proc).
+
+
+return_proc(#state{} = State, #proc_int{} = ProcInt) ->
+    #proc_int{pid = Pid, lang = Lang} = ProcInt,
+    NewState = case is_process_alive(Pid) of true ->
+        case ProcInt#proc_int.t0 < State#state.threshold_ts of
+            true ->
+                remove_proc(State, ProcInt);
+            false ->
+                gen_server:cast(Pid, garbage_collect),
+                true = ets:update_element(?PROCS, Pid, [
+                    {#proc_int.client, undefined}
+                ]),
+                State
+        end;
+    false ->
+        remove_proc(State, ProcInt)
+    end,
+    flush_waiters(NewState, Lang).
+
+
+remove_proc(State, #proc_int{}=Proc) ->
+    ets:delete(?PROCS, Proc#proc_int.pid),
+    case is_process_alive(Proc#proc_int.pid) of true ->
+        unlink(Proc#proc_int.pid),
+        gen_server:cast(Proc#proc_int.pid, stop);
+    false ->
+        ok
+    end,
+    Counts = State#state.counts,
+    Lang = Proc#proc_int.lang,
+    State#state{
+        counts = dict:update_counter(Lang, -1, Counts)
+    }.
+
+
+-spec export_proc(#proc_int{}) -> #proc{}.
+export_proc(#proc_int{} = ProcInt) ->
+    ProcIntList = tuple_to_list(ProcInt),
+    ProcLen = record_info(size, proc),
+    [_ | Data] = lists:sublist(ProcIntList, ProcLen),
+    list_to_tuple([proc | Data]).
+
+
+flush_waiters(State) ->
+    dict:fold(fun(Lang, Count, StateAcc) ->
+        case Count < State#state.hard_limit of
+            true ->
+                flush_waiters(StateAcc, Lang);
+            false ->
+                StateAcc
+        end
+    end, State, State#state.counts).
+
+
+flush_waiters(State, Lang) ->
+    CanSpawn = can_spawn(State, Lang),
+    case get_waiting_client(Lang) of
+        #client{from = From} = Client ->
+            case find_proc(Client) of
+                {ok, ProcInt} ->
+                    Proc = assign_proc(Client, ProcInt),
+                    gen_server:reply(From, {ok, Proc, State#state.config}),
+                    remove_waiting_client(Client),
+                    flush_waiters(State, Lang);
+                {error, Error} ->
+                    gen_server:reply(From, {error, Error}),
+                    remove_waiting_client(Client),
+                    flush_waiters(State, Lang);
+                not_found when CanSpawn ->
+                    NewState = spawn_proc(State, Client),
+                    remove_waiting_client(Client),
+                    flush_waiters(NewState, Lang);
+                not_found ->
+                    State
+            end;
+        undefined ->
+            State
+    end.
+
+
+add_waiting_client(Client) ->
+    ets:insert(?WAITERS, Client#client{timestamp=os:timestamp()}).
+
+-spec get_waiting_client(Lang :: binary()) -> undefined | #client{}.
+get_waiting_client(Lang) ->
+    case ets:match_object(?WAITERS, #client{lang=Lang, _='_'}, 1) of
+        '$end_of_table' ->
+            undefined;
+        {[#client{}=Client], _} ->
+            Client
+    end.
+
+
+remove_waiting_client(#client{timestamp = Timestamp}) ->
+    ets:delete(?WAITERS, Timestamp).
+
+
+can_spawn(#state{hard_limit = HardLimit, counts = Counts}, Lang) ->
+    case dict:find(Lang, Counts) of
+        {ok, Count} -> Count < HardLimit;
+        error -> true
+    end.
+
+
+get_proc_config() ->
+    Limit = config:get("query_server_config", "reduce_limit", "true"),
+    Timeout = config:get("couchdb", "os_process_timeout", "5000"),
+    {[
+        {<<"reduce_limit">>, list_to_atom(Limit)},
+        {<<"timeout">>, list_to_integer(Timeout)}
+    ]}.
+
+
+get_hard_limit() ->
+    LimStr = config:get("query_server_config", "os_process_limit", "100"),
+    list_to_integer(LimStr).
+
+
+get_soft_limit() ->
+    LimStr = config:get("query_server_config", "os_process_soft_limit", "100"),
+    list_to_integer(LimStr).
diff --git a/src/couch_js/src/couch_js_query_servers.erl b/src/couch_js/src/couch_js_query_servers.erl
new file mode 100644
index 0000000..12dc864
--- /dev/null
+++ b/src/couch_js/src/couch_js_query_servers.erl
@@ -0,0 +1,683 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_js_query_servers).
+
+-export([try_compile/4]).
+-export([start_doc_map/3, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]).
+-export([reduce/3, rereduce/3,validate_doc_update/5]).
+-export([filter_docs/5]).
+-export([filter_view/3]).
+-export([finalize/2]).
+-export([rewrite/3]).
+
+-export([with_ddoc_proc/2, proc_prompt/2, ddoc_prompt/3, ddoc_proc_prompt/3, json_doc/1]).
+
+% For 210-os-proc-pool.t
+-export([get_os_process/1, get_ddoc_process/2, ret_os_process/1]).
+
+-include_lib("couch/include/couch_db.hrl").
+
+-define(SUMERROR, <<"The _sum function requires that map values be numbers, "
+    "arrays of numbers, or objects. Objects cannot be mixed with other "
+    "data structures. Objects can be arbitrarily nested, provided that the values "
+    "for all fields are themselves numbers, arrays of numbers, or objects.">>).
+
+-define(STATERROR, <<"The _stats function requires that map values be numbers "
+    "or arrays of numbers, not '~p'">>).
+
+
+try_compile(Proc, FunctionType, FunctionName, FunctionSource) ->
+    try
+        proc_prompt(Proc, [<<"add_fun">>, FunctionSource]),
+        ok
+    catch
+        {compilation_error, E} ->
+            Fmt = "Compilation of the ~s function in the '~s' view failed: ~s",
+            Msg = io_lib:format(Fmt, [FunctionType, FunctionName, E]),
+            throw({compilation_error, Msg});
+        {os_process_error, {exit_status, ExitStatus}} ->
+            Fmt = "Compilation of the ~s function in the '~s' view failed with exit status: ~p",
+            Msg = io_lib:format(Fmt, [FunctionType, FunctionName, ExitStatus]),
+            throw({compilation_error, Msg})
+    end.
+
+start_doc_map(Lang, Functions, Lib) ->
+    Proc = get_os_process(Lang),
+    case Lib of
+    {[]} -> ok;
+    Lib ->
+        true = proc_prompt(Proc, [<<"add_lib">>, Lib])
+    end,
+    lists:foreach(fun(FunctionSource) ->
+        true = proc_prompt(Proc, [<<"add_fun">>, FunctionSource])
+    end, Functions),
+    {ok, Proc}.
+
+map_doc_raw(Proc, Doc) ->
+    Json = couch_doc:to_json_obj(Doc, []),
+    {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}.
+
+
+stop_doc_map(nil) ->
+    ok;
+stop_doc_map(Proc) ->
+    ok = ret_os_process(Proc).
+
+group_reductions_results([]) ->
+    [];
+group_reductions_results(List) ->
+    {Heads, Tails} = lists:foldl(
+        fun([H|T], {HAcc,TAcc}) ->
+            {[H|HAcc], [T|TAcc]}
+        end, {[], []}, List),
+    case Tails of
+    [[]|_] -> % no tails left
+        [Heads];
+    _ ->
+     [Heads | group_reductions_results(Tails)]
+    end.
+
+finalize(<<"_approx_count_distinct",_/binary>>, Reduction) ->
+    true = hyper:is_hyper(Reduction),
+    {ok, round(hyper:card(Reduction))};
+finalize(<<"_stats",_/binary>>, Unpacked) ->
+    {ok, pack_stats(Unpacked)};
+finalize(_RedSrc, Reduction) ->
+    {ok, Reduction}.
+
+rereduce(_Lang, [], _ReducedValues) ->
+    {ok, []};
+rereduce(Lang, RedSrcs, ReducedValues) ->
+    Grouped = group_reductions_results(ReducedValues),
+    Results = lists:zipwith(
+        fun
+        (<<"_", _/binary>> = FunSrc, Values) ->
+            {ok, [Result]} = builtin_reduce(rereduce, [FunSrc], [[[], V] || V <- Values], []),
+            Result;
+        (FunSrc, Values) ->
+            os_rereduce(Lang, [FunSrc], Values)
+        end, RedSrcs, Grouped),
+    {ok, Results}.
+
+reduce(_Lang, [], _KVs) ->
+    {ok, []};
+reduce(Lang, RedSrcs, KVs) ->
+    {OsRedSrcs, BuiltinReds} = lists:partition(fun
+        (<<"_", _/binary>>) -> false;
+        (_OsFun) -> true
+    end, RedSrcs),
+    {ok, OsResults} = os_reduce(Lang, OsRedSrcs, KVs),
+    {ok, BuiltinResults} = builtin_reduce(reduce, BuiltinReds, KVs, []),
+    recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, []).
+
+
+recombine_reduce_results([], [], [], Acc) ->
+    {ok, lists:reverse(Acc)};
+recombine_reduce_results([<<"_", _/binary>>|RedSrcs], OsResults, [BRes|BuiltinResults], Acc) ->
+    recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [BRes|Acc]);
+recombine_reduce_results([_OsFun|RedSrcs], [OsR|OsResults], BuiltinResults, Acc) ->
+    recombine_reduce_results(RedSrcs, OsResults, BuiltinResults, [OsR|Acc]).
+
+os_reduce(_Lang, [], _KVs) ->
+    {ok, []};
+os_reduce(Lang, OsRedSrcs, KVs) ->
+    Proc = get_os_process(Lang),
+    OsResults = try proc_prompt(Proc, [<<"reduce">>, OsRedSrcs, KVs]) of
+        [true, Reductions] -> Reductions
+    catch
+        throw:{reduce_overflow_error, Msg} ->
+            [{[{reduce_overflow_error, Msg}]} || _ <- OsRedSrcs]
+    after
+        ok = ret_os_process(Proc)
+    end,
+    {ok, OsResults}.
+
+os_rereduce(Lang, OsRedSrcs, KVs) ->
+    case get_overflow_error(KVs) of
+        undefined ->
+            Proc = get_os_process(Lang),
+            try proc_prompt(Proc, [<<"rereduce">>, OsRedSrcs, KVs]) of
+                [true, [Reduction]] -> Reduction
+            catch
+                throw:{reduce_overflow_error, Msg} ->
+                    {[{reduce_overflow_error, Msg}]}
+            after
+                ok = ret_os_process(Proc)
+            end;
+        Error ->
+            Error
+    end.
+
+
+get_overflow_error([]) ->
+    undefined;
+get_overflow_error([{[{reduce_overflow_error, _}]} = Error | _]) ->
+    Error;
+get_overflow_error([_ | Rest]) ->
+    get_overflow_error(Rest).
+
+
+builtin_reduce(_Re, [], _KVs, Acc) ->
+    {ok, lists:reverse(Acc)};
+builtin_reduce(Re, [<<"_sum",_/binary>>|BuiltinReds], KVs, Acc) ->
+    Sum = builtin_sum_rows(KVs, 0),
+    Red = check_sum_overflow(?term_size(KVs), ?term_size(Sum), Sum),
+    builtin_reduce(Re, BuiltinReds, KVs, [Red|Acc]);
+builtin_reduce(reduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) ->
+    Count = length(KVs),
+    builtin_reduce(reduce, BuiltinReds, KVs, [Count|Acc]);
+builtin_reduce(rereduce, [<<"_count",_/binary>>|BuiltinReds], KVs, Acc) ->
+    Count = builtin_sum_rows(KVs, 0),
+    builtin_reduce(rereduce, BuiltinReds, KVs, [Count|Acc]);
+builtin_reduce(Re, [<<"_stats",_/binary>>|BuiltinReds], KVs, Acc) ->
+    Stats = builtin_stats(Re, KVs),
+    builtin_reduce(Re, BuiltinReds, KVs, [Stats|Acc]);
+builtin_reduce(Re, [<<"_approx_count_distinct",_/binary>>|BuiltinReds], KVs, Acc) ->
+    Distinct = approx_count_distinct(Re, KVs),
+    builtin_reduce(Re, BuiltinReds, KVs, [Distinct|Acc]).
+
+
+builtin_sum_rows([], Acc) ->
+    Acc;
+builtin_sum_rows([[_Key, Value] | RestKVs], Acc) ->
+    try sum_values(Value, Acc) of
+        NewAcc ->
+            builtin_sum_rows(RestKVs, NewAcc)
+    catch
+        throw:{builtin_reduce_error, Obj} ->
+            Obj;
+        throw:{invalid_value, Reason, Cause} ->
+            {[{<<"error">>, <<"builtin_reduce_error">>},
+                {<<"reason">>, Reason}, {<<"caused_by">>, Cause}]}
+    end.
+
+
+sum_values(Value, Acc) when is_number(Value), is_number(Acc) ->
+    Acc + Value;
+sum_values(Value, Acc) when is_list(Value), is_list(Acc) ->
+    sum_arrays(Acc, Value);
+sum_values(Value, Acc) when is_number(Value), is_list(Acc) ->
+    sum_arrays(Acc, [Value]);
+sum_values(Value, Acc) when is_list(Value), is_number(Acc) ->
+    sum_arrays([Acc], Value);
+sum_values({Props}, Acc) ->
+    case lists:keyfind(<<"error">>, 1, Props) of
+        {<<"error">>, <<"builtin_reduce_error">>} ->
+            throw({builtin_reduce_error, {Props}});
+        false ->
+            ok
+    end,
+    case Acc of
+        0 ->
+            {Props};
+        {AccProps} ->
+            {sum_objects(lists:sort(Props), lists:sort(AccProps))}
+    end;
+sum_values(Else, _Acc) ->
+    throw_sum_error(Else).
+
+sum_objects([{K1, V1} | Rest1], [{K1, V2} | Rest2]) ->
+    [{K1, sum_values(V1, V2)} | sum_objects(Rest1, Rest2)];
+sum_objects([{K1, V1} | Rest1], [{K2, V2} | Rest2]) when K1 < K2 ->
+    [{K1, V1} | sum_objects(Rest1, [{K2, V2} | Rest2])];
+sum_objects([{K1, V1} | Rest1], [{K2, V2} | Rest2]) when K1 > K2 ->
+    [{K2, V2} | sum_objects([{K1, V1} | Rest1], Rest2)];
+sum_objects([], Rest) ->
+    Rest;
+sum_objects(Rest, []) ->
+    Rest.
+
+sum_arrays([], []) ->
+    [];
+sum_arrays([_|_]=Xs, []) ->
+    Xs;
+sum_arrays([], [_|_]=Ys) ->
+    Ys;
+sum_arrays([X|Xs], [Y|Ys]) when is_number(X), is_number(Y) ->
+    [X+Y | sum_arrays(Xs,Ys)];
+sum_arrays(Else, _) ->
+    throw_sum_error(Else).
+
+check_sum_overflow(InSize, OutSize, Sum) ->
+    Overflowed = OutSize > 4906 andalso OutSize * 2 > InSize,
+    case config:get("query_server_config", "reduce_limit", "true") of
+        "true" when Overflowed ->
+            Msg = log_sum_overflow(InSize, OutSize),
+            {[
+                {<<"error">>, <<"builtin_reduce_error">>},
+                {<<"reason">>, Msg}
+            ]};
+        "log" when Overflowed ->
+            log_sum_overflow(InSize, OutSize),
+            Sum;
+        _ ->
+            Sum
+    end.
+
+log_sum_overflow(InSize, OutSize) ->
+    Fmt = "Reduce output must shrink more rapidly: "
+            "input size: ~b "
+            "output size: ~b",
+    Msg = iolist_to_binary(io_lib:format(Fmt, [InSize, OutSize])),
+    couch_log:error(Msg, []),
+    Msg.
+
+builtin_stats(_, []) ->
+    {0, 0, 0, 0, 0};
+builtin_stats(_, [[_,First]|Rest]) ->
+    lists:foldl(fun([_Key, Value], Acc) ->
+        stat_values(Value, Acc)
+    end, build_initial_accumulator(First), Rest).
+
+stat_values(Value, Acc) when is_list(Value), is_list(Acc) ->
+    lists:zipwith(fun stat_values/2, Value, Acc);
+stat_values({PreRed}, Acc) when is_list(PreRed) ->
+    stat_values(unpack_stats({PreRed}), Acc);
+stat_values(Value, Acc) when is_number(Value) ->
+    stat_values({Value, 1, Value, Value, Value*Value}, Acc);
+stat_values(Value, Acc) when is_number(Acc) ->
+    stat_values(Value, {Acc, 1, Acc, Acc, Acc*Acc});
+stat_values(Value, Acc) when is_tuple(Value), is_tuple(Acc) ->
+    {Sum0, Cnt0, Min0, Max0, Sqr0} = Value,
+    {Sum1, Cnt1, Min1, Max1, Sqr1} = Acc,
+    {
+      Sum0 + Sum1,
+      Cnt0 + Cnt1,
+      erlang:min(Min0, Min1),
+      erlang:max(Max0, Max1),
+      Sqr0 + Sqr1
+    };
+stat_values(Else, _Acc) ->
+    throw_stat_error(Else).
+
+build_initial_accumulator(L) when is_list(L) ->
+    [build_initial_accumulator(X) || X <- L];
+build_initial_accumulator(X) when is_number(X) ->
+    {X, 1, X, X, X*X};
+build_initial_accumulator({_, _, _, _, _} = AlreadyUnpacked) ->
+    AlreadyUnpacked;
+build_initial_accumulator({Props}) ->
+    unpack_stats({Props});
+build_initial_accumulator(Else) ->
+    Msg = io_lib:format("non-numeric _stats input: ~w", [Else]),
+    throw({invalid_value, iolist_to_binary(Msg)}).
+
+unpack_stats({PreRed}) when is_list(PreRed) ->
+    {
+      get_number(<<"sum">>, PreRed),
+      get_number(<<"count">>, PreRed),
+      get_number(<<"min">>, PreRed),
+      get_number(<<"max">>, PreRed),
+      get_number(<<"sumsqr">>, PreRed)
+    }.
+
+
+pack_stats({Sum, Cnt, Min, Max, Sqr}) ->
+    {[{<<"sum">>,Sum}, {<<"count">>,Cnt}, {<<"min">>,Min}, {<<"max">>,Max}, {<<"sumsqr">>,Sqr}]};
+pack_stats({Packed}) ->
+    % Legacy code path before we had the finalize operation
+    {Packed};
+pack_stats(Stats) when is_list(Stats) ->
+    lists:map(fun pack_stats/1, Stats).
+
+get_number(Key, Props) ->
+    case couch_util:get_value(Key, Props) of
+    X when is_number(X) ->
+        X;
+    undefined when is_binary(Key) ->
+        get_number(binary_to_atom(Key, latin1), Props);
+    undefined ->
+        Msg = io_lib:format("user _stats input missing required field ~s (~p)",
+            [Key, Props]),
+        throw({invalid_value, iolist_to_binary(Msg)});
+    Else ->
+        Msg = io_lib:format("non-numeric _stats input received for ~s: ~w",
+            [Key, Else]),
+        throw({invalid_value, iolist_to_binary(Msg)})
+    end.
+
+% TODO allow customization of precision in the ddoc.
+approx_count_distinct(reduce, KVs) ->
+    lists:foldl(fun([[Key, _Id], _Value], Filter) ->
+        hyper:insert(term_to_binary(Key), Filter)
+    end, hyper:new(11), KVs);
+approx_count_distinct(rereduce, Reds) ->
+    hyper:union([Filter || [_, Filter] <- Reds]).
+
+% use the function stored in ddoc.validate_doc_update to test an update.
+-spec validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) -> ok when
+    DDoc    :: ddoc(),
+    EditDoc :: doc(),
+    DiskDoc :: doc() | nil,
+    Ctx     :: user_ctx(),
+    SecObj  :: sec_obj().
+
+validate_doc_update(DDoc, EditDoc, DiskDoc, Ctx, SecObj) ->
+    JsonEditDoc = couch_doc:to_json_obj(EditDoc, [revs]),
+    JsonDiskDoc = json_doc(DiskDoc),
+    Resp = ddoc_prompt(
+        DDoc,
+        [<<"validate_doc_update">>],
+        [JsonEditDoc, JsonDiskDoc, Ctx, SecObj]
+    ),
+    if Resp == 1 -> ok; true ->
+        couch_stats:increment_counter([couchdb, query_server, vdu_rejects], 1)
+    end,
+    case Resp of
+        RespCode when RespCode =:= 1; RespCode =:= ok; RespCode =:= true ->
+            ok;
+        {[{<<"forbidden">>, Message}]} ->
+            throw({forbidden, Message});
+        {[{<<"unauthorized">>, Message}]} ->
+            throw({unauthorized, Message});
+        {[{_, Message}]} ->
+            throw({unknown_error, Message});
+        Message when is_binary(Message) ->
+            throw({unknown_error, Message})
+    end.
+
+
+rewrite(Req, Db, DDoc) ->
+    Fields = [F || F <- chttpd_external:json_req_obj_fields(),
+              F =/= <<"info">>, F =/= <<"form">>,
+              F =/= <<"uuid">>, F =/= <<"id">>],
+    JsonReq = chttpd_external:json_req_obj(Req, Db, null, Fields),
+    case ddoc_prompt(DDoc, [<<"rewrites">>], [JsonReq]) of
+        {[{<<"forbidden">>, Message}]} ->
+            throw({forbidden, Message});
+        {[{<<"unauthorized">>, Message}]} ->
+            throw({unauthorized, Message});
+        [<<"no_dispatch_rule">>] ->
+            undefined;
+        [<<"ok">>, {V}=Rewrite] when is_list(V) ->
+            ok = validate_rewrite_response(Rewrite),
+            Rewrite;
+        [<<"ok">>, _]  ->
+            throw_rewrite_error(<<"bad rewrite">>);
+        V ->
+            couch_log:error("bad rewrite return ~p", [V]),
+            throw({unknown_error, V})
+    end.
+
+validate_rewrite_response({Fields}) when is_list(Fields) ->
+    validate_rewrite_response_fields(Fields).
+
+validate_rewrite_response_fields([{Key, Value} | Rest]) ->
+    validate_rewrite_response_field(Key, Value),
+    validate_rewrite_response_fields(Rest);
+validate_rewrite_response_fields([]) ->
+    ok.
+
+validate_rewrite_response_field(<<"method">>, Method) when is_binary(Method) ->
+    ok;
+validate_rewrite_response_field(<<"method">>, _) ->
+    throw_rewrite_error(<<"bad method">>);
+validate_rewrite_response_field(<<"path">>, Path) when is_binary(Path) ->
+    ok;
+validate_rewrite_response_field(<<"path">>, _) ->
+    throw_rewrite_error(<<"bad path">>);
+validate_rewrite_response_field(<<"body">>, Body) when is_binary(Body) ->
+    ok;
+validate_rewrite_response_field(<<"body">>, _) ->
+    throw_rewrite_error(<<"bad body">>);
+validate_rewrite_response_field(<<"headers">>, {Props}=Headers) when is_list(Props) ->
+    validate_object_fields(Headers);
+validate_rewrite_response_field(<<"headers">>, _) ->
+    throw_rewrite_error(<<"bad headers">>);
+validate_rewrite_response_field(<<"query">>, {Props}=Query) when is_list(Props) ->
+    validate_object_fields(Query);
+validate_rewrite_response_field(<<"query">>, _) ->
+    throw_rewrite_error(<<"bad query">>);
+validate_rewrite_response_field(<<"code">>, Code) when is_integer(Code) andalso Code >= 200 andalso Code < 600 ->
+    ok;
+validate_rewrite_response_field(<<"code">>, _) ->
+    throw_rewrite_error(<<"bad code">>);
+validate_rewrite_response_field(K, V) ->
+    couch_log:debug("unknown rewrite field ~p=~p", [K, V]),
+    ok.
+
+validate_object_fields({Props}) when is_list(Props) ->
+    lists:foreach(fun
+        ({Key, Value}) when is_binary(Key) andalso is_binary(Value) ->
+            ok;
+        ({Key, Value}) ->
+            Reason = io_lib:format(
+                "object key/value must be strings ~p=~p", [Key, Value]),
+            throw_rewrite_error(Reason);
+        (Value) ->
+            throw_rewrite_error(io_lib:format("bad value ~p", [Value]))
+    end, Props).
+
+
+throw_rewrite_error(Reason) when is_list(Reason)->
+    throw_rewrite_error(iolist_to_binary(Reason));
+throw_rewrite_error(Reason) when is_binary(Reason) ->
+    throw({rewrite_error, Reason}).
+
+
+json_doc_options() ->
+    json_doc_options([]).
+
+json_doc_options(Options) ->
+    Limit = config:get_integer("query_server_config", "revs_limit", 20),
+    [{revs, Limit} | Options].
+
+json_doc(Doc) ->
+    json_doc(Doc, json_doc_options()).
+
+json_doc(nil, _) ->
+    null;
+json_doc(Doc, Options) ->
+    couch_doc:to_json_obj(Doc, Options).
+
+filter_view(DDoc, VName, Docs) ->
+    Options = json_doc_options(),
+    JsonDocs = [json_doc(Doc, Options) || Doc <- Docs],
+    [true, Passes] = ddoc_prompt(DDoc, [<<"views">>, VName, <<"map">>], [JsonDocs]),
+    {ok, Passes}.
+
+filter_docs(Req, Db, DDoc, FName, Docs) ->
+    JsonReq = case Req of
+        {json_req, JsonObj} ->
+            JsonObj;
+        #httpd{} = HttpReq ->
+            couch_httpd_external:json_req_obj(HttpReq, Db)
+    end,
+    Options = json_doc_options(),
+    JsonDocs = [json_doc(Doc, Options) || Doc <- Docs],
+    [true, Passes] = ddoc_prompt(DDoc, [<<"filters">>, FName],
+        [JsonDocs, JsonReq]),
+    {ok, Passes}.
+
+ddoc_proc_prompt({Proc, DDocId}, FunPath, Args) ->
+    proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args]).
+
+ddoc_prompt(DDoc, FunPath, Args) ->
+    with_ddoc_proc(DDoc, fun({Proc, DDocId}) ->
+        proc_prompt(Proc, [<<"ddoc">>, DDocId, FunPath, Args])
+    end).
+
+with_ddoc_proc(#doc{id=DDocId,revs={Start, [DiskRev|_]}}=DDoc, Fun) ->
+    Rev = couch_doc:rev_to_str({Start, DiskRev}),
+    DDocKey = {DDocId, Rev},
+    Proc = get_ddoc_process(DDoc, DDocKey),
+    try Fun({Proc, DDocId})
+    after
+        ok = ret_os_process(Proc)
+    end.
+
+proc_prompt(Proc, Args) ->
+     case proc_prompt_raw(Proc, Args) of
+     {json, Json} ->
+         ?JSON_DECODE(Json);
+     EJson ->
+         EJson
+     end.
+
+proc_prompt_raw(#proc{prompt_fun = {Mod, Func}} = Proc, Args) ->
+    apply(Mod, Func, [Proc#proc.pid, Args]).
+
+raw_to_ejson({json, Json}) ->
+    ?JSON_DECODE(Json);
+raw_to_ejson(EJson) ->
+    EJson.
+
+proc_stop(Proc) ->
+    {Mod, Func} = Proc#proc.stop_fun,
+    apply(Mod, Func, [Proc#proc.pid]).
+
+proc_set_timeout(Proc, Timeout) ->
+    {Mod, Func} = Proc#proc.set_timeout_fun,
+    apply(Mod, Func, [Proc#proc.pid, Timeout]).
+
+get_os_process_timeout() ->
+    list_to_integer(config:get("couchdb", "os_process_timeout", "5000")).
+
+get_ddoc_process(#doc{} = DDoc, DDocKey) ->
+    % remove this case statement
+    case gen_server:call(couch_js_proc_manager, {get_proc, DDoc, DDocKey}, get_os_process_timeout()) of
+    {ok, Proc, {QueryConfig}} ->
+        % process knows the ddoc
+        case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
+        true ->
+            proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
+            Proc;
+        _ ->
+            catch proc_stop(Proc),
+            get_ddoc_process(DDoc, DDocKey)
+        end;
+    Error ->
+        throw(Error)
+    end.
+
+get_os_process(Lang) ->
+    case gen_server:call(couch_js_proc_manager, {get_proc, Lang}, get_os_process_timeout()) of
+    {ok, Proc, {QueryConfig}} ->
+        case (catch proc_prompt(Proc, [<<"reset">>, {QueryConfig}])) of
+        true ->
+            proc_set_timeout(Proc, couch_util:get_value(<<"timeout">>, QueryConfig)),
+            Proc;
+        _ ->
+            catch proc_stop(Proc),
+            get_os_process(Lang)
+        end;
+    Error ->
+        throw(Error)
+    end.
+
+ret_os_process(Proc) ->
+    true = gen_server:call(couch_js_proc_manager, {ret_proc, Proc}, infinity),
+    catch unlink(Proc#proc.pid),
+    ok.
+
+throw_sum_error(Else) ->
+    throw({invalid_value, ?SUMERROR, Else}).
+
+throw_stat_error(Else) ->
+    throw({invalid_value, iolist_to_binary(io_lib:format(?STATERROR, [Else]))}).
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+builtin_sum_rows_negative_test() ->
+    A = [{[{<<"a">>, 1}]}, {[{<<"a">>, 2}]}, {[{<<"a">>, 3}]}],
+    E = {[{<<"error">>, <<"builtin_reduce_error">>}]},
+    ?assertEqual(E, builtin_sum_rows([["K", E]], [])),
+    % The below case is where the value is invalid, but no error because
+    % it's only one document.
+    ?assertEqual(A, builtin_sum_rows([["K", A]], [])),
+    {Result} = builtin_sum_rows([["K", A]], [1, 2, 3]),
+    ?assertEqual({<<"error">>, <<"builtin_reduce_error">>},
+        lists:keyfind(<<"error">>, 1, Result)).
+
+sum_values_test() ->
+    ?assertEqual(3, sum_values(1, 2)),
+    ?assertEqual([2,4,6], sum_values(1, [1,4,6])),
+    ?assertEqual([3,5,7], sum_values([3,2,4], [0,3,3])),
+    X = {[{<<"a">>,1}, {<<"b">>,[1,2]}, {<<"c">>, {[{<<"d">>,3}]}},
+            {<<"g">>,1}]},
+    Y = {[{<<"a">>,2}, {<<"b">>,3}, {<<"c">>, {[{<<"e">>, 5}]}},
+            {<<"f">>,1}, {<<"g">>,1}]},
+    Z = {[{<<"a">>,3}, {<<"b">>,[4,2]}, {<<"c">>, {[{<<"d">>,3},{<<"e">>,5}]}},
+            {<<"f">>,1}, {<<"g">>,2}]},
+    ?assertEqual(Z, sum_values(X, Y)),
+    ?assertEqual(Z, sum_values(Y, X)).
+
+sum_values_negative_test() ->
+    % invalid value
+    A = [{[{<<"a">>, 1}]}, {[{<<"a">>, 2}]}, {[{<<"a">>, 3}]}],
+    B = ["error 1", "error 2"],
+    C = [<<"error 3">>, <<"error 4">>],
+    KV = {[{<<"error">>, <<"builtin_reduce_error">>},
+        {<<"reason">>, ?SUMERROR}, {<<"caused_by">>, <<"some cause">>}]},
+    ?assertThrow({invalid_value, _, _}, sum_values(A, [1, 2, 3])),
+    ?assertThrow({invalid_value, _, _}, sum_values(A, 0)),
+    ?assertThrow({invalid_value, _, _}, sum_values(B, [1, 2])),
+    ?assertThrow({invalid_value, _, _}, sum_values(C, [0])),
+    ?assertThrow({builtin_reduce_error, KV}, sum_values(KV, [0])).
+
+stat_values_test() ->
+    ?assertEqual({1, 2, 0, 1, 1}, stat_values(1, 0)),
+    ?assertEqual({11, 2, 1, 10, 101}, stat_values(1, 10)),
+    ?assertEqual([{9, 2, 2, 7, 53},
+                  {14, 2, 3, 11, 130},
+                  {18, 2, 5, 13, 194}
+                 ], stat_values([2,3,5], [7,11,13])).
+
+reduce_stats_test() ->
+    ?assertEqual([
+        {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+    ], test_reduce(<<"_stats">>, [[[null, key], 2]])),
+
+    ?assertEqual([[
+        {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]},
+        {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+    ]], test_reduce(<<"_stats">>, [[[null, key],[1,2]]])),
+
+    ?assertEqual(
+         {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+    , element(2, finalize(<<"_stats">>, {2, 1, 2, 2, 4}))),
+
+    ?assertEqual([
+        {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]},
+        {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+    ], element(2, finalize(<<"_stats">>, [
+        {1, 1, 1, 1, 1},
+        {2, 1, 2, 2, 4}
+    ]))),
+
+    ?assertEqual([
+        {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]},
+        {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+    ], element(2, finalize(<<"_stats">>, [
+        {1, 1, 1, 1, 1},
+        {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+    ]))),
+
+    ?assertEqual([
+        {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]},
+        {[{<<"sum">>,2},{<<"count">>,1},{<<"min">>,2},{<<"max">>,2},{<<"sumsqr">>,4}]}
+    ], element(2, finalize(<<"_stats">>, [
+        {[{<<"sum">>,1},{<<"count">>,1},{<<"min">>,1},{<<"max">>,1},{<<"sumsqr">>,1}]},
+        {2, 1, 2, 2, 4}
+    ]))),
+    ok.
+
+test_reduce(Reducer, KVs) ->
+    ?assertMatch({ok, _}, reduce(<<"javascript">>, [Reducer], KVs)),
+    {ok, Reduced} = reduce(<<"javascript">>, [Reducer], KVs),
+    {ok, Finalized} = finalize(Reducer, Reduced),
+    Finalized.
+
+-endif.
diff --git a/src/couch_js/src/couch_js_sup.erl b/src/couch_js/src/couch_js_sup.erl
new file mode 100644
index 0000000..e875461
--- /dev/null
+++ b/src/couch_js/src/couch_js_sup.erl
@@ -0,0 +1,45 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+
+-module(couch_js_sup).
+-behaviour(supervisor).
+
+
+-export([
+    start_link/0
+]).
+
+-export([
+    init/1
+]).
+
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+
+init([]) ->
+    Flags = #{
+        strategy => one_for_one,
+        intensity => 50,
+        period => 3600
+    },
+    Children = [
+        #{
+            id => couch_js_proc_manager,
+            restart => permanent,
+            shutdown => brutal_kill,
+            start => {couch_js_proc_manager, start_link, []}
+        }
+    ],
+    {ok, {Flags, Children}}.


[couchdb] 05/05: Update couch_views to use couch_eval

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davisp pushed a commit to branch prototype/fdb-layer-couch-eval
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 8d01cdf011bbe5dd8149199ff4de2ed2cdb84222
Author: Paul J. Davis <pa...@gmail.com>
AuthorDate: Tue Aug 20 16:16:57 2019 -0500

    Update couch_views to use couch_eval
---
 src/couch_views/src/couch_views.app.src           |  3 +-
 src/couch_views/src/couch_views_indexer.erl       | 65 +++++++++++++++++------
 src/couch_views/test/couch_views_indexer_test.erl |  1 +
 src/couch_views/test/couch_views_map_test.erl     |  7 ++-
 4 files changed, 57 insertions(+), 19 deletions(-)

diff --git a/src/couch_views/src/couch_views.app.src b/src/couch_views/src/couch_views.app.src
index c80c30b..0d666af 100644
--- a/src/couch_views/src/couch_views.app.src
+++ b/src/couch_views/src/couch_views.app.src
@@ -26,6 +26,7 @@
         config,
         couch_stats,
         fabric,
-        couch_jobs
+        couch_jobs,
+        couch_eval
     ]}
 ]}.
diff --git a/src/couch_views/src/couch_views_indexer.erl b/src/couch_views/src/couch_views_indexer.erl
index 83d1b6a..55ce063 100644
--- a/src/couch_views/src/couch_views_indexer.erl
+++ b/src/couch_views/src/couch_views_indexer.erl
@@ -120,7 +120,7 @@ update(#{} = Db, Mrst0, State0) ->
 
     case State4 of
         finished ->
-            couch_query_servers:stop_doc_map(Mrst2#mrst.qserver);
+            couch_eval:release_map_context(Mrst2#mrst.qserver);
         _ ->
             update(Db, Mrst2, State4)
     end.
@@ -171,20 +171,42 @@ map_docs(Mrst, Docs) ->
     % Run all the non deleted docs through the view engine and
     Mrst1 = start_query_server(Mrst),
     QServer = Mrst1#mrst.qserver,
-    MapFun = fun
-        (#{deleted := true} = Change) ->
-            Change#{results => []};
-        (#{deleted := false} = Change) ->
-            #{doc := Doc} = Change,
-            couch_stats:increment_counter([couchdb, mrview, map_doc]),
-            {ok, RawResults} = couch_query_servers:map_doc_raw(QServer, Doc),
-            JsonResults = couch_query_servers:raw_to_ejson(RawResults),
-            ListResults = lists:map(fun(ViewResults) ->
-                [list_to_tuple(Res) || Res <- ViewResults]
-            end, JsonResults),
-            Change#{results => ListResults}
-    end,
-    {Mrst1, lists:map(MapFun, Docs)}.
+
+    {Deleted0, NotDeleted0} = lists:partition(fun(Doc) ->
+        #{deleted := Deleted} = Doc,
+        Deleted
+    end, Docs),
+
+    Deleted1 = lists:map(fun(Doc) ->
+        Doc#{results => []}
+    end, Deleted0),
+
+    DocsToMap = lists:map(fun(Doc) ->
+        #{doc := DocRec} = Doc,
+        DocRec
+    end, NotDeleted0),
+
+    {ok, AllResults} = couch_eval:map_docs(QServer, DocsToMap),
+
+    % The expanded function head here is making an assertion
+    % that the results match the given doc
+    NotDeleted1 = lists:zipwith(fun(#{id := DocId} = Doc, {DocId, Results}) ->
+        Doc#{results => Results}
+    end, NotDeleted0, AllResults),
+
+    % I'm being a bit careful here resorting the docs
+    % in order of the changes feed. Theoretically this is
+    % unnecessary since we're inside a single transaction.
+    % However, I'm concerned if we ever split this up
+    % into multiple transactions that this detail might
+    % be important but forgotten.
+    MappedDocs = lists:sort(fun(A, B) ->
+        #{sequence := ASeq} = A,
+        #{sequence := BSeq} = B,
+        ASeq =< BSeq
+    end, Deleted1 ++ NotDeleted1),
+
+    {Mrst1, MappedDocs}.
 
 
 write_docs(TxDb, Mrst, Docs, State) ->
@@ -249,12 +271,21 @@ fetch_docs(Db, Changes) ->
 
 start_query_server(#mrst{qserver = nil} = Mrst) ->
     #mrst{
+        db_name = DbName,
+        idx_name = DDocId,
         language = Language,
+        sig = Sig,
         lib = Lib,
         views = Views
     } = Mrst,
-    Defs = [View#mrview.def || View <- Views],
-    {ok, QServer} = couch_query_servers:start_doc_map(Language, Defs, Lib),
+    {ok, QServer} = couch_eval:acquire_map_context(
+            DbName,
+            DDocId,
+            Language,
+            Sig,
+            Lib,
+            [View#mrview.def || View <- Views]
+        ),
     Mrst#mrst{qserver = QServer};
 
 start_query_server(#mrst{} = Mrst) ->
diff --git a/src/couch_views/test/couch_views_indexer_test.erl b/src/couch_views/test/couch_views_indexer_test.erl
index 02c8cee..20ad0dc 100644
--- a/src/couch_views/test/couch_views_indexer_test.erl
+++ b/src/couch_views/test/couch_views_indexer_test.erl
@@ -52,6 +52,7 @@ setup() ->
     Ctx = test_util:start_couch([
             fabric,
             couch_jobs,
+            couch_js,
             couch_views
         ]),
     Ctx.
diff --git a/src/couch_views/test/couch_views_map_test.erl b/src/couch_views/test/couch_views_map_test.erl
index 0b0ab68..f8ba183 100644
--- a/src/couch_views/test/couch_views_map_test.erl
+++ b/src/couch_views/test/couch_views_map_test.erl
@@ -20,7 +20,12 @@
 
 
 setup() ->
-    test_util:start_couch([fabric, couch_jobs, couch_views]).
+    test_util:start_couch([
+            fabric,
+            couch_jobs,
+            couch_js,
+            couch_views
+        ]).
 
 
 teardown(State) ->