You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by dc...@apache.org on 2012/12/12 22:10:29 UTC

git commit: COUCHDB-1334 - revert "More efficient communication with the view server"

Updated Branches:
  refs/heads/1.3.x 4f786335b -> a8aff9f91


COUCHDB-1334 - revert "More efficient communication with the view server"

This reverts commit a851c6e
- COUCHDB-1334 breaks with Windows + couchjs in unexplained ways
- reducing to 1 concurrent query server is not sufficient
- Testing with open_port options overlapped_io was not in itself sufficient
- http://erlang.org/doc/man/erlang.html find overlapped_io
- Refer history in COUCHDB-1346


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/a8aff9f9
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/a8aff9f9
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/a8aff9f9

Branch: refs/heads/1.3.x
Commit: a8aff9f91790b0dae347b10b7c11b21f34faa0b2
Parents: 4f78633
Author: Dave Cottlehuber <dc...@apache.org>
Authored: Wed Dec 12 21:37:18 2012 +0100
Committer: Dave Cottlehuber <dc...@apache.org>
Committed: Wed Dec 12 22:09:38 2012 +0100

----------------------------------------------------------------------
 src/couch_mrview/src/couch_mrview_updater.erl |   46 ++++++++------------
 src/couchdb/couch_native_process.erl          |   11 +----
 src/couchdb/couch_os_process.erl              |   38 +----------------
 src/couchdb/couch_query_servers.erl           |   17 ++-----
 4 files changed, 25 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/a8aff9f9/src/couch_mrview/src/couch_mrview_updater.erl
----------------------------------------------------------------------
diff --git a/src/couch_mrview/src/couch_mrview_updater.erl b/src/couch_mrview/src/couch_mrview_updater.erl
index 3014664..9604ea9 100644
--- a/src/couch_mrview/src/couch_mrview_updater.erl
+++ b/src/couch_mrview/src/couch_mrview_updater.erl
@@ -130,42 +130,32 @@ map_docs(Parent, State0) ->
             couch_query_servers:stop_doc_map(State0#mrst.qserver),
             couch_work_queue:close(State0#mrst.write_queue);
         {ok, Dequeued} ->
+            % Run all the non deleted docs through the view engine and
+            % then pass the results on to the writer process.
             State1 = case State0#mrst.qserver of
                 nil -> start_query_server(State0);
                 _ -> State0
             end,
-            {ok, MapResults} = compute_map_results(State1, Dequeued),
-            couch_work_queue:queue(State1#mrst.write_queue, MapResults),
+            QServer = State1#mrst.qserver,
+            DocFun = fun
+                ({nil, Seq, _}, {SeqAcc, Results}) ->
+                    {erlang:max(Seq, SeqAcc), Results};
+                ({Id, Seq, deleted}, {SeqAcc, Results}) ->
+                    {erlang:max(Seq, SeqAcc), [{Id, []} | Results]};
+                ({Id, Seq, Doc}, {SeqAcc, Results}) ->
+                    {ok, Res} = couch_query_servers:map_doc_raw(QServer, Doc),
+                    {erlang:max(Seq, SeqAcc), [{Id, Res} | Results]}
+            end,
+            FoldFun = fun(Docs, Acc) ->
+                update_task(length(Docs)),
+                lists:foldl(DocFun, Acc, Docs)
+            end,
+            Results = lists:foldl(FoldFun, {0, []}, Dequeued),
+            couch_work_queue:queue(State1#mrst.write_queue, Results),
             map_docs(Parent, State1)
     end.
 
 
-compute_map_results(#mrst{qserver = Qs}, Dequeued) ->
-    % Run all the non deleted docs through the view engine and
-    % then pass the results on to the writer process.
-    DocFun = fun
-        ({nil, Seq, _}, {SeqAcc, AccDel, AccNotDel}) ->
-            {erlang:max(Seq, SeqAcc), AccDel, AccNotDel};
-        ({Id, Seq, deleted}, {SeqAcc, AccDel, AccNotDel}) ->
-            {erlang:max(Seq, SeqAcc), [{Id, []} | AccDel], AccNotDel};
-        ({_Id, Seq, Doc}, {SeqAcc, AccDel, AccNotDel}) ->
-            {erlang:max(Seq, SeqAcc), AccDel, [Doc | AccNotDel]}
-    end,
-    FoldFun = fun(Docs, Acc) ->
-        lists:foldl(DocFun, Acc, Docs)
-    end,
-    {MaxSeq, DeletedResults, Docs} =
-        lists:foldl(FoldFun, {0, [], []}, Dequeued),
-    {ok, MapResultList} = couch_query_servers:map_docs_raw(Qs, Docs),
-    NotDeletedResults = lists:zipwith(
-        fun(#doc{id = Id}, MapResults) -> {Id, MapResults} end,
-        Docs,
-        MapResultList),
-    AllMapResults = DeletedResults ++ NotDeletedResults,
-    update_task(length(AllMapResults)),
-    {ok, {MaxSeq, AllMapResults}}.
-
-
 write_results(Parent, State) ->
     case couch_work_queue:dequeue(State#mrst.write_queue) of
         closed ->

http://git-wip-us.apache.org/repos/asf/couchdb/blob/a8aff9f9/src/couchdb/couch_native_process.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_native_process.erl b/src/couchdb/couch_native_process.erl
index b1d51ed..5a32e75 100644
--- a/src/couchdb/couch_native_process.erl
+++ b/src/couchdb/couch_native_process.erl
@@ -42,7 +42,7 @@
 
 -export([start_link/0,init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,
          handle_info/2]).
--export([set_timeout/2, prompt/2, prompt_many/2]).
+-export([set_timeout/2, prompt/2]).
 
 -define(STATE, native_proc_state).
 -record(evstate, {ddocs, funs=[], query_config=[], list_pid=nil, timeout=5000}).
@@ -62,15 +62,6 @@ set_timeout(Pid, TimeOut) ->
 prompt(Pid, Data) when is_list(Data) ->
     gen_server:call(Pid, {prompt, Data}).
 
-prompt_many(Pid, DataList) ->
-    prompt_many(Pid, DataList, []).
-
-prompt_many(_Pid, [], Acc) ->
-    {ok, lists:reverse(Acc)};
-prompt_many(Pid, [Data | Rest], Acc) ->
-    Result = prompt(Pid, Data),
-    prompt_many(Pid, Rest, [Result | Acc]).
-
 % gen_server callbacks
 init([]) ->
     {ok, #evstate{ddocs=dict:new()}}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/a8aff9f9/src/couchdb/couch_os_process.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_os_process.erl b/src/couchdb/couch_os_process.erl
index 3a267be..db62d49 100644
--- a/src/couchdb/couch_os_process.erl
+++ b/src/couchdb/couch_os_process.erl
@@ -14,7 +14,7 @@
 -behaviour(gen_server).
 
 -export([start_link/1, start_link/2, start_link/3, stop/1]).
--export([set_timeout/2, prompt/2, prompt_many/2]).
+-export([set_timeout/2, prompt/2]).
 -export([send/2, writeline/2, readline/1, writejson/2, readjson/1]).
 -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
 
@@ -57,40 +57,6 @@ prompt(Pid, Data) ->
             throw(Error)
     end.
 
-prompt_many(Pid, DataList) ->
-    OsProc = gen_server:call(Pid, get_os_proc, infinity),
-    true = port_connect(OsProc#os_proc.port, self()),
-    try
-        send_many(OsProc, DataList),
-        receive_many(length(DataList), OsProc, [])
-    after
-        % Can throw badarg error, when OsProc Pid is dead or port was closed
-        % by the readline function on error/timeout.
-        (catch port_connect(OsProc#os_proc.port, Pid)),
-        unlink(OsProc#os_proc.port),
-        drop_port_messages(OsProc#os_proc.port)
-    end.
-
-send_many(_OsProc, []) ->
-    ok;
-send_many(#os_proc{writer = Writer} = OsProc, [Data | Rest]) ->
-    Writer(OsProc, Data),
-    send_many(OsProc, Rest).
-
-receive_many(0, _OsProc, Acc) ->
-    {ok, lists:reverse(Acc)};
-receive_many(N, #os_proc{reader = Reader} = OsProc, Acc) ->
-    Line = Reader(OsProc),
-    receive_many(N - 1, OsProc, [Line | Acc]).
-
-drop_port_messages(Port) ->
-    receive
-    {Port, _} ->
-        drop_port_messages(Port)
-    after 0 ->
-        ok
-    end.
-
 % Utility functions for reading and writing
 % in custom functions
 writeline(OsProc, Data) when is_record(OsProc, os_proc) ->
@@ -209,8 +175,6 @@ terminate(_Reason, #os_proc{port=Port}) ->
     catch port_close(Port),
     ok.
 
-handle_call(get_os_proc, _From, OsProc) ->
-    {reply, OsProc, OsProc};
 handle_call({set_timeout, TimeOut}, _From, OsProc) ->
     {reply, ok, OsProc#os_proc{timeout=TimeOut}};
 handle_call({prompt, Data}, _From, OsProc) ->

http://git-wip-us.apache.org/repos/asf/couchdb/blob/a8aff9f9/src/couchdb/couch_query_servers.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_query_servers.erl b/src/couchdb/couch_query_servers.erl
index e29f23b..3b58cbe 100644
--- a/src/couchdb/couch_query_servers.erl
+++ b/src/couchdb/couch_query_servers.erl
@@ -16,7 +16,7 @@
 -export([start_link/0, config_change/1]).
 
 -export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,code_change/3]).
--export([start_doc_map/3, map_docs/2, map_docs_raw/2, stop_doc_map/1, raw_to_ejson/1]).
+-export([start_doc_map/3, map_docs/2, map_doc_raw/2, stop_doc_map/1, raw_to_ejson/1]).
 -export([reduce/3, rereduce/3,validate_doc_update/5]).
 -export([filter_docs/5]).
 -export([filter_view/3]).
@@ -33,7 +33,6 @@
     lang,
     ddoc_keys = [],
     prompt_fun,
-    prompt_many_fun,
     set_timeout_fun,
     stop_fun
 }).
@@ -84,15 +83,10 @@ map_docs(Proc, Docs) ->
         Docs),
     {ok, Results}.
 
-map_docs_raw(Proc, DocList) ->
-    {Mod, Fun} = Proc#proc.prompt_many_fun,
-    CommandList = lists:map(
-        fun(Doc) ->
-            EJson = couch_doc:to_json_obj(Doc, []),
-            [<<"map_doc">>, EJson]
-        end,
-        DocList),
-    Mod:Fun(Proc#proc.pid, CommandList).
+map_doc_raw(Proc, Doc) ->
+    Json = couch_doc:to_json_obj(Doc, []),
+    {ok, proc_prompt_raw(Proc, [<<"map_doc">>, Json])}.
+
 
 stop_doc_map(nil) ->
     ok;
@@ -487,7 +481,6 @@ new_process(Langs, LangLimits, Lang) ->
                        pid=Pid,
                        % Called via proc_prompt, proc_set_timeout, and proc_stop
                        prompt_fun={Mod, prompt},
-                       prompt_many_fun={Mod, prompt_many},
                        set_timeout_fun={Mod, set_timeout},
                        stop_fun={Mod, stop}}};
         _ ->