You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ko...@apache.org on 2009/07/22 19:34:14 UTC

svn commit: r796805 - in /couchdb/trunk/src/couchdb: couch_view_group.erl couch_view_updater.erl

Author: kocolosk
Date: Wed Jul 22 17:34:14 2009
New Revision: 796805

URL: http://svn.apache.org/viewvc?rev=796805&view=rev
Log:
various bugfixes and improvements for view compaction

View compaction was broken after the switch to signature-based index
files, but we don't have tests for it yet so we didn't notice.  I also
committed a few changes that should make compaction faster and more
robust:

* commit the header immediately after compaction finishes.  We used
  to wait 1 seconds, but if the server restarted in that second the
  index would be reset.

* unlink from old index file at the end. Prevents process crashes
  that could couch_view (and with the delayed commit would sometimes
  cause index resets).
  
* don't wait for running view updates to finish before replacing old
  view index file.  If an update is running, restart it and point it to
  the new view group.  This alleviates the situation where the view
  compaction goes into a busy wait, printing "still behind main file"
  1000s of times to the log, and generally makes compaction finish more
  quickly.
  
* better logging

Modified:
    couchdb/trunk/src/couchdb/couch_view_group.erl
    couchdb/trunk/src/couchdb/couch_view_updater.erl

Modified: couchdb/trunk/src/couchdb/couch_view_group.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view_group.erl?rev=796805&r1=796804&r2=796805&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_view_group.erl (original)
+++ couchdb/trunk/src/couchdb/couch_view_group.erl Wed Jul 22 17:34:14 2009
@@ -152,11 +152,15 @@
     GroupInfo = get_group_info(Group, CompactorPid),
     {reply, {ok, GroupInfo}, State}.
 
-handle_cast({start_compact, CompactFun}, #group_state{ compactor_pid=nil,
-        group=Group, init_args={view, RootDir, DbName, GroupId} } = State) ->
-    ?LOG_INFO("Starting view group compaction", []),
+handle_cast({start_compact, CompactFun}, #group_state{compactor_pid=nil}
+        = State) ->
+    #group_state{
+        group = #group{name = GroupId, sig = GroupSig} = Group,
+        init_args = {RootDir, DbName, _}
+    } = State,
+    ?LOG_INFO("View index compaction starting for ~s ~s", [DbName, GroupId]),
     {ok, Db} = couch_db:open(DbName, []),
-    {ok, Fd} = open_index_file(RootDir, DbName, <<GroupId/binary,".compact">>),
+    {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig),
     NewGroup = reset_file(Db, Fd, DbName, Group),
     Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end),
     {noreply, State#group_state{compactor_pid = Pid}};
@@ -164,36 +168,56 @@
     %% compact already running, this is a no-op
     {noreply, State};
 
-handle_cast({compact_done, #group{fd=NewFd, current_seq=NewSeq} = NewGroup},
-        #group_state{
-            group = #group{current_seq=OldSeq, sig=GroupSig} = Group,
-            init_args = {view, RootDir, DbName, _GroupId},
-            updater_pid = nil,
-            ref_counter = RefCounter
-        } = State) when NewSeq >= OldSeq ->
-    ?LOG_INFO("View Group compaction complete", []),
+handle_cast({compact_done, #group{current_seq=NewSeq} = NewGroup},
+        #group_state{group = #group{current_seq=OldSeq}} = State)
+        when NewSeq >= OldSeq ->
+    #group_state{
+        group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group,
+        init_args = {RootDir, DbName, _}, 
+        updater_pid = UpdaterPid,
+        ref_counter = RefCounter
+    } = State,
+
+    ?LOG_INFO("View index compaction complete for ~s ~s", [DbName, GroupId]),
     FileName = index_file_name(RootDir, DbName, GroupSig),
     CompactName = index_file_name(compact, RootDir, DbName, GroupSig),
     file:delete(FileName),
     ok = file:rename(CompactName, FileName),
 
+    %% if an updater is running, kill it and start a new one
+    NewUpdaterPid =
+    if is_pid(UpdaterPid) ->
+        unlink(UpdaterPid),
+        exit(UpdaterPid, view_compaction_complete),
+        Owner = self(),
+        spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end);
+    true ->
+        nil
+    end,
+
     %% cleanup old group
+    unlink(OldFd),
     couch_ref_counter:drop(RefCounter),
-    {ok, NewRefCounter} = couch_ref_counter:start([NewFd]),
+    {ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]),
     case Group#group.db of
         nil -> ok;
         Else -> couch_db:close(Else)
     end,
 
-    erlang:send_after(1000, self(), delayed_commit),
+    self() ! delayed_commit,
     {noreply, State#group_state{
         group=NewGroup,
         ref_counter=NewRefCounter,
-        compactor_pid=nil
+        compactor_pid=nil,
+        updater_pid=NewUpdaterPid
     }};
-handle_cast({compact_done, NewGroup}, #group_state{
-        init_args={view, _RootDir, DbName, GroupId} } = State) ->
-    ?LOG_INFO("View index compaction still behind main file", []),
+handle_cast({compact_done, NewGroup}, State) ->
+    #group_state{
+        group = #group{name = GroupId, current_seq = CurrentSeq},
+        init_args={_RootDir, DbName, _}
+    } = State,
+    ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++
+        "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]),
     couch_db:close(NewGroup#group.db),
     {ok, Db} = couch_db:open(DbName, []),
     Pid = spawn_link(fun() ->
@@ -209,7 +233,8 @@
     end),
     {noreply, State#group_state{compactor_pid = Pid}};
 
-handle_cast({partial_update, NewGroup}, State) ->
+handle_cast({partial_update, Pid, NewGroup}, #group_state{updater_pid=Pid}
+        = State) ->
     #group_state{
         db_name = DbName,
         waiting_commit = WaitingCommit
@@ -221,7 +246,10 @@
         erlang:send_after(1000, self(), delayed_commit);
     true -> ok
     end,
-    {noreply, State#group_state{group=NewGroup, waiting_commit=true}}.
+    {noreply, State#group_state{group=NewGroup, waiting_commit=true}};
+handle_cast({partial_update, _, _}, State) ->
+    %% message from an old (probably pre-compaction) updater; ignore
+    {noreply, State}.
 
 handle_info(delayed_commit, #group_state{db_name=DbName,group=Group}=State) ->
     {ok, Db} = couch_db:open(DbName, []),
@@ -266,6 +294,9 @@
         {noreply, State#group_state{waiting_commit=true,
                 waiting_list=StillWaiting, group=Group2, updater_pid=Pid}}
     end;
+handle_info({'EXIT', _, {new_group, _}}, State) ->
+    %% message from an old (probably pre-compaction) updater; ignore
+    {noreply, State};
 
 handle_info({'EXIT', FromPid, reset},
         #group_state{
@@ -283,7 +314,10 @@
     Error ->
         {stop, normal, reply_all(State, Error)}
     end;
-
+handle_info({'EXIT', _, reset}, State) ->
+    %% message from an old (probably pre-compaction) updater; ignore
+    {noreply, State};
+    
 handle_info({'EXIT', _FromPid, normal}, State) ->
     {noreply, State};
 
@@ -389,6 +423,14 @@
     Error           -> Error
     end.
 
+open_index_file(compact, RootDir, DbName, GroupSig) ->
+    FileName = index_file_name(compact, RootDir, DbName, GroupSig),
+    case couch_file:open(FileName) of
+    {ok, Fd}        -> {ok, Fd};
+    {error, enoent} -> couch_file:open(FileName, [create]);
+    Error           -> Error
+    end.
+
 open_temp_group(DbName, Language, DesignOptions, MapSrc, RedSrc) ->
     case couch_db:open(DbName, []) of
     {ok, Db} ->

Modified: couchdb/trunk/src/couchdb/couch_view_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_view_updater.erl?rev=796805&r1=796804&r2=796805&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_view_updater.erl (original)
+++ couchdb/trunk/src/couchdb/couch_view_updater.erl Wed Jul 22 17:34:14 2009
@@ -132,7 +132,7 @@
             {ok, Group2} = write_changes(Group1, ViewKVs3, DocIdViewIdKeys3,
                 DocInfo#doc_info.high_seq),
             if is_pid(Owner) ->
-                ok = gen_server:cast(Owner, {partial_update, Group2});
+                ok = gen_server:cast(Owner, {partial_update, self(), Group2});
             true -> ok end,
             garbage_collect(),
             ViewEmptyKeyValues = [{View, []} || View <- Group2#group.views],