You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ko...@apache.org on 2009/07/16 22:40:59 UTC

svn commit: r794843 - in /couchdb/trunk/src/couchdb: couch_view_group.erl couch_view_updater.erl

Author: kocolosk
Date: Thu Jul 16 20:40:59 2009
New Revision: 794843

URL: http://svn.apache.org/viewvc?rev=794843&view=rev
Log:
checkpoint long-running view updates

Improves stale=ok view queries, which will now see gradually updating indexes.
Also reduces the consequences of an updater crash because we can restart from
the latest checkpoint.

Modified:
    couchdb/trunk/src/couchdb/couch_view_group.erl
    couchdb/trunk/src/couchdb/couch_view_updater.erl

Modified: couchdb/trunk/src/couchdb/couch_view_group.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view_group.erl?rev=794843&r1=794842&r2=794843&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_view_group.erl (original)
+++ couchdb/trunk/src/couchdb/couch_view_group.erl Thu Jul 16 20:40:59 2009
@@ -89,7 +89,8 @@
     case prepare_group(InitArgs, false) of
     {ok, #group{db=Db, fd=Fd}=Group} ->
         couch_db:monitor(Db),
-        Pid = spawn_link(fun()-> couch_view_updater:update(Group) end),
+        Owner = self(),
+        Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group) end),
         {ok, RefCounter} = couch_ref_counter:start([Fd]),
         {ok, #group_state{
                 db_name=couch_db:name(Db),
@@ -127,7 +128,8 @@
             }=State) when RequestSeq > Seq ->
     {ok, Db} = couch_db:open(DbName, []),
     Group2 = Group#group{db=Db},
-    Pid = spawn_link(fun()-> couch_view_updater:update(Group2) end),
+    Owner = self(),
+    Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end),
 
     {noreply, State#group_state{
         updater_pid=Pid,
@@ -205,7 +207,7 @@
     {ok, Db} = couch_db:open(DbName, []),
     Pid = spawn_link(fun() ->
         {_,Ref} = erlang:spawn_monitor(fun() ->
-            couch_view_updater:update(NewGroup#group{db = Db})
+            couch_view_updater:update(nil, NewGroup#group{db = Db})
         end),
         receive
             {'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
@@ -214,7 +216,21 @@
                 gen_server:cast(Pid2, {compact_done, NewGroup2})
         end
     end),
-    {noreply, State#group_state{compactor_pid = Pid}}.
+    {noreply, State#group_state{compactor_pid = Pid}};
+
+handle_cast({partial_update, NewGroup}, State) ->
+    #group_state{
+        db_name = DbName,
+        waiting_commit = WaitingCommit
+    } = State,
+    NewSeq = NewGroup#group.current_seq,
+    ?LOG_INFO("checkpointing view update at seq ~p for ~s ~s", [NewSeq,
+        DbName, NewGroup#group.name]),
+    if not WaitingCommit ->
+        erlang:send_after(1000, self(), delayed_commit);
+    true -> ok
+    end,
+    {noreply, State#group_state{group=NewGroup, waiting_commit=true}}.
 
 handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
     {ok, Db} = couch_db:open(DbName, []),
@@ -254,7 +270,8 @@
         % we still have some waiters, reopen the database and reupdate the index
         {ok, Db2} = couch_db:open(DbName, []),
         Group2 = Group#group{db=Db2},
-        Pid = spawn_link(fun() -> couch_view_updater:update(Group2) end),
+        Owner = self(),
+        Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end),
         {noreply, State#group_state{waiting_commit=true,
                 waiting_list=StillWaiting, group=Group2, updater_pid=Pid}}
     end;
@@ -267,7 +284,8 @@
     ok = couch_db:close(Group#group.db),
     case prepare_group(InitArgs, true) of
     {ok, ResetGroup} ->
-        Pid = spawn_link(fun()-> couch_view_updater:update(ResetGroup) end),
+        Owner = self(),
+        Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end),
         {noreply, State#group_state{
                 updater_pid=Pid,
                 group=ResetGroup}};

Modified: couchdb/trunk/src/couchdb/couch_view_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view_updater.erl?rev=794843&r1=794842&r2=794843&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_view_updater.erl (original)
+++ couchdb/trunk/src/couchdb/couch_view_updater.erl Thu Jul 16 20:40:59 2009
@@ -12,11 +12,17 @@
 
 -module(couch_view_updater).
 
--export([update/1]).
+-export([update/2]).
 
 -include("couch_db.hrl").
 
-update(#group{db=#db{name=DbName}=Db,name=GroupName,current_seq=Seq,purge_seq=PurgeSeq}=Group) ->
+update(Owner, Group) ->
+    #group{
+        db = #db{name=DbName} = Db,
+        name = GroupName,
+        current_seq = Seq,
+        purge_seq = PurgeSeq
+    } = Group,
     couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>),
 
     DbPurgeSeq = couch_db:get_purge_seq(Db),
@@ -43,7 +49,7 @@
             fun(DocInfo, _, {ChangesProcessed, Acc}) ->
                 couch_task_status:update("Processed ~p of ~p changes (~p%)",
                         [ChangesProcessed, TotalChanges, (ChangesProcessed*100) div TotalChanges]),
-                {ok, {ChangesProcessed+1, process_doc(Db, DocInfo, Acc)}}
+                {ok, {ChangesProcessed+1, process_doc(Db, Owner, DocInfo, Acc)}}
             end,
             {0, {[], Group2, ViewEmptyKVs, []}}
             ),
@@ -90,9 +96,9 @@
             views=Views2,
             purge_seq=couch_db:get_purge_seq(Db)}.
 
-process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId,design_options=DesignOptions}=Group, ViewKVs,
-        DocIdViewIdKeys}) ->
-    % This fun computes once for each document
+% This fun computes once for each document
+process_doc(Db, Owner, DocInfo, {Docs, Group, ViewKVs, DocIdViewIdKeys}) ->
+    #group{ design_options = DesignOptions } = Group,
 
     #doc_info{id=DocId, revs=[#rev_info{deleted=Deleted}|_]} = DocInfo,
     IncludeDesign = proplists:get_value(<<"include_design">>,
@@ -125,6 +131,9 @@
                 Results, ViewKVs, DocIdViewIdKeys2),
             {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3,
                 DocInfo#doc_info.high_seq),
+            if is_pid(Owner) ->
+                ok = gen_server:cast(Owner, {partial_update, Group2});
+            true -> ok end,
             garbage_collect(),
             ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views],
             {[], Group2, ViewEmptyKeyValues, []};