You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2009/09/02 04:59:49 UTC

svn commit: r810345 - in /couchdb/trunk/src/couchdb: couch_view_updater.erl couch_work_queue.erl

Author: damien
Date: Wed Sep  2 02:59:49 2009
New Revision: 810345

URL: http://svn.apache.org/viewvc?rev=810345&view=rev
Log:
Added 3 stage pipeline for indexing views: loader, mapper, writer. This results in better performance and resource utilization.

Added:
    couchdb/trunk/src/couchdb/couch_work_queue.erl
Modified:
    couchdb/trunk/src/couchdb/couch_view_updater.erl

Modified: couchdb/trunk/src/couchdb/couch_view_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view_updater.erl?rev=810345&r1=810344&r2=810345&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_view_updater.erl (original)
+++ couchdb/trunk/src/couchdb/couch_view_updater.erl Wed Sep  2 02:59:49 2009
@@ -36,33 +36,43 @@
         couch_task_status:update(<<"Resetting view index due to lost purge entries.">>),
         exit(reset)
     end,
-
+    {ok, MapQueue} = couch_work_queue:new(100000, 500),
+    {ok, WriteQueue} = couch_work_queue:new(100000, 500),
+    Self = self(),
     ViewEmptyKVs = [{View, []} || View <- Group2#group.views],
+    spawn_link(fun() -> do_maps(Group, MapQueue, WriteQueue, ViewEmptyKVs) end),
+    spawn_link(fun() -> do_writes(Self, Owner, Group2, WriteQueue) end),
     % compute on all docs modified since we last computed.
     TotalChanges = couch_db:count_changes_since(Db, Seq),
     % update status every half second
     couch_task_status:set_update_frequency(500),
-    {ok, {_,{UncomputedDocs, Group3, ViewKVsToAdd, DocIdViewIdKeys}}}
+    #group{ design_options = DesignOptions } = Group,
+    IncludeDesign = proplists:get_value(<<"include_design">>,
+        DesignOptions, false),
+    LocalSeq = proplists:get_value(<<"local_seq">>, DesignOptions, false),
+    DocOpts =
+    case LocalSeq of
+    true -> [conflicts, deleted_conflicts, local_seq];
+    _ -> [conflicts, deleted_conflicts]
+    end,
+    {ok, _}
         = couch_db:enum_docs_since(
             Db,
             Seq,
-            fun(DocInfo, _, {ChangesProcessed, Acc}) ->
+            fun(DocInfo, _, ChangesProcessed) ->
                 couch_task_status:update("Processed ~p of ~p changes (~p%)",
                         [ChangesProcessed, TotalChanges, (ChangesProcessed*100) div TotalChanges]),
-                {ok, {ChangesProcessed+1, process_doc(Db, Owner, DocInfo, Acc)}}
+                load_doc(Db, DocInfo, MapQueue, DocOpts, IncludeDesign),
+                {ok, ChangesProcessed+1}
             end,
-            {0, {[], Group2, ViewEmptyKVs, []}}
-            ),
+            0),
     couch_task_status:set_update_frequency(0),
     couch_task_status:update("Finishing."),
-    {Group4, Results} = view_compute(Group3, UncomputedDocs),
-    {ViewKVsToAdd2, DocIdViewIdKeys2} = view_insert_query_results(
-            UncomputedDocs, Results, ViewKVsToAdd, DocIdViewIdKeys),
-    couch_query_servers:stop_doc_map(Group4#group.query_server),
-    NewSeq = couch_db:get_update_seq(Db),
-    {ok, Group5} = write_changes(Group4, ViewKVsToAdd2, DocIdViewIdKeys2,
-                NewSeq),
-    exit({new_group, Group5#group{query_server=nil}}).
+    couch_work_queue:close(MapQueue),
+    receive {new_group, NewGroup} ->
+        exit({new_group,
+                NewGroup#group{current_seq=couch_db:get_update_seq(Db)}})
+    end.
 
 
 purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
@@ -96,51 +106,61 @@
             views=Views2,
             purge_seq=couch_db:get_purge_seq(Db)}.
 
-% This fun computes once for each document
-process_doc(Db, Owner, DocInfo, {Docs, Group, ViewKVs, DocIdViewIdKeys}) ->
-    #group{ design_options = DesignOptions } = Group,
 
-    #doc_info{id=DocId, revs=[#rev_info{deleted=Deleted}|_]} = DocInfo,
-    IncludeDesign = proplists:get_value(<<"include_design">>,
-        DesignOptions, false),
-    LocalSeq = proplists:get_value(<<"local_seq">>,
-        DesignOptions, false),
-    DocOpts = case LocalSeq of
-        true ->
-            [conflicts, deleted_conflicts, local_seq];
-        _ ->
-            [conflicts, deleted_conflicts]
-    end,
+load_doc(Db, DocInfo, MapQueue, DocOpts, IncludeDesign) ->
+    #doc_info{id=DocId, high_seq=Seq, revs=[#rev_info{deleted=Deleted}|_]} = DocInfo,
     case {IncludeDesign, DocId} of
     {false, <<?DESIGN_DOC_PREFIX, _/binary>>} -> % we skip design docs
-        {Docs, Group, ViewKVs, DocIdViewIdKeys};
+        ok;
     _ ->
-        {Docs2, DocIdViewIdKeys2} =
         if Deleted ->
-            {Docs, [{DocId, []} | DocIdViewIdKeys]};
-        true ->
-            {ok, Doc} = couch_db:open_doc_int(Db, DocInfo,
-                DocOpts),
-            {[Doc | Docs], DocIdViewIdKeys}
-        end,
-
-        case couch_util:should_flush() of
+            couch_work_queue:queue(MapQueue, {Seq, #doc{id=DocId, deleted=true}});
         true ->
-            {Group1, Results} = view_compute(Group, Docs2),
-            {ViewKVs3, DocIdViewIdKeys3} = view_insert_query_results(Docs2,
-                Results, ViewKVs, DocIdViewIdKeys2),
-            {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3,
-                DocInfo#doc_info.high_seq),
-            if is_pid(Owner) ->
-                ok = gen_server:cast(Owner, {partial_update, self(), Group2});
-            true -> ok end,
-            garbage_collect(),
-            ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views],
-            {[], Group2, ViewEmptyKeyValues, []};
-        false ->
-            {Docs2, Group, ViewKVs, DocIdViewIdKeys2}
+            {ok, Doc} = couch_db:open_doc_int(Db, DocInfo, DocOpts),
+            couch_work_queue:queue(MapQueue, {Seq, Doc})
         end
     end.
+    
+do_maps(Group, MapQueue, WriteQueue, ViewEmptyKVs) ->
+    case couch_work_queue:dequeue(MapQueue) of
+    closed ->
+        couch_work_queue:close(WriteQueue),
+        couch_query_servers:stop_doc_map(Group#group.query_server);
+    {ok, Queue} ->
+        Docs = [Doc || {_,#doc{deleted=false}=Doc} <- Queue],
+        DelKVs = [{Id, []} || {_, #doc{deleted=true,id=Id}} <- Queue],
+        LastSeq = lists:max([Seq || {Seq, _Doc} <- Queue]),
+        {Group1, Results} = view_compute(Group, Docs),
+        {ViewKVs, DocIdViewIdKeys} = view_insert_query_results(Docs,
+                    Results, ViewEmptyKVs, DelKVs),
+        couch_work_queue:queue(WriteQueue, {LastSeq, ViewKVs, DocIdViewIdKeys}),
+        do_maps(Group1, MapQueue, WriteQueue, ViewEmptyKVs)
+    end.
+
+do_writes(Parent, Owner, Group, WriteQueue) ->
+    case couch_work_queue:dequeue(WriteQueue) of
+    closed ->
+        Parent ! {new_group, Group};
+    {ok, Queue} ->
+        {NewSeq, ViewKeyValues, DocIdViewIdKeys} = lists:foldl(
+            fun({Seq, ViewKVs, DocIdViewIdKeys}, nil) ->
+                {Seq, ViewKVs, DocIdViewIdKeys};
+            ({Seq, ViewKVs, DocIdViewIdKeys}, Acc) ->
+                {Seq2, AccViewKVs, AccDocIdViewIdKeys} = Acc,
+                AccViewKVs2 = lists:zipwith(
+                    fun({View, KVsIn}, {_View, KVsAcc}) ->
+                        {View, KVsIn ++ KVsAcc}
+                    end, ViewKVs, AccViewKVs),
+                {lists:max([Seq, Seq2]),
+                        AccViewKVs2, DocIdViewIdKeys ++ AccDocIdViewIdKeys}
+            end, nil, Queue),
+        Group2 = write_changes(Group, ViewKeyValues, DocIdViewIdKeys, NewSeq),
+        case Owner of
+        nil -> ok;
+        _ -> ok = gen_server:cast(Owner, {partial_update, self(), Group2})
+        end,
+        do_writes(Parent, nil, Group2, WriteQueue)
+    end.
 
 view_insert_query_results([], [], ViewKVs, DocIdViewIdKeysAcc) ->
     {ViewKVs, DocIdViewIdKeysAcc};
@@ -214,16 +234,11 @@
             end
         end,
         dict:new(), LookupResults),
-
-    Views2 = [
-        begin
+    Views2 = lists:zipwith(fun(View, {_View, AddKeyValues}) ->
             KeysToRemove = couch_util:dict_find(View#view.id_num, KeysToRemoveByView, []),
             {ok, ViewBtree2} = couch_btree:add_remove(View#view.btree, AddKeyValues, KeysToRemove),
             View#view{btree = ViewBtree2}
-        end
-    ||
-        {View, AddKeyValues} <- ViewKeyValuesToAdd
-    ],
-    Group2 = Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2},
-    {ok, Group2}.
+        end,    Group#group.views, ViewKeyValuesToAdd),
+    Group#group{views=Views2, current_seq=NewSeq, id_btree=IdBtree2}.
+
 

Added: couchdb/trunk/src/couchdb/couch_work_queue.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_work_queue.erl?rev=810345&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_work_queue.erl (added)
+++ couchdb/trunk/src/couchdb/couch_work_queue.erl Wed Sep  2 02:59:49 2009
@@ -0,0 +1,92 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_work_queue).
+-behaviour(gen_server).
+
+-export([new/2,queue/2,dequeue/1,close/1]).
+-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
+
+-record(q, {
+    buffer=[],
+    blocked=[],
+    max_size,
+    max_items,
+    items=0,
+    size=0,
+    work_waiter=nil,
+    close_on_dequeue=false
+}).
+
+new(MaxSize, MaxItems) ->
+    gen_server:start_link(couch_work_queue, {MaxSize, MaxItems}, []).
+
+queue(Wq, Item) ->
+    gen_server:call(Wq, {queue, Item}, infinity).
+
+dequeue(Wq) ->
+    try gen_server:call(Wq, dequeue, infinity)
+    catch
+        _:_ -> closed
+    end.
+
+close(Wq) ->
+    gen_server:cast(Wq, close).
+    
+
+init({MaxSize,MaxItems}) ->
+    {ok, #q{max_size=MaxSize, max_items=MaxItems}}.
+
+terminate(_Reason, #q{work_waiter=nil}) ->
+    ok;
+terminate(_Reason, #q{work_waiter=WW}) ->
+    gen_server:reply(WW, closed).
+    
+handle_call({queue, Item}, From, #q{work_waiter=nil}=Q0) ->
+    Q = Q0#q{size=Q0#q.size + size(term_to_binary(Item)),
+                items=Q0#q.items + 1,
+                buffer=[Item | Q0#q.buffer]},
+    case (Q#q.size >= Q#q.max_size) orelse
+            (Q#q.items >= Q#q.max_items) of
+    true ->
+        {noreply, Q#q{blocked=[From | Q#q.blocked]}};
+    false ->
+        {reply, ok, Q}
+    end;
+handle_call({queue, Item}, _From, #q{work_waiter=WW}=Q) ->
+    gen_server:reply(WW, {ok, [Item]}),
+    {reply, ok, Q#q{work_waiter=nil}};
+handle_call(dequeue, _From, #q{work_waiter=WW}) when WW /= nil ->
+    exit("Only one caller allowed to wait for work at a time");
+handle_call(dequeue, From, #q{items=0}=Q) ->
+    {noreply, Q#q{work_waiter=From}};
+handle_call(dequeue, _From, #q{buffer=Buff, max_size=MaxSize,
+        max_items=MaxItems, close_on_dequeue=Close}=Q) ->
+    [gen_server:reply(From, ok) || From <- Q#q.blocked],
+    Q2 = #q{max_size=MaxSize, max_items=MaxItems},
+    if Close ->
+        {stop, normal, {ok, Buff}, Q2};
+    true ->
+        {reply, {ok, Buff}, #q{max_size=MaxSize, max_items=MaxItems}}
+    end.
+
+handle_cast(close, #q{buffer=[]}=Q) ->
+    {stop, normal, Q};
+handle_cast(close, Q) ->
+    {noreply, Q#q{close_on_dequeue=true}}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+handle_info(X, Q) ->
+    {stop, X, Q}.