You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2011/06/23 11:45:39 UTC

svn commit: r1138798 - in /couchdb/branches/1.1.x/src/couchdb: couch_db.hrl couch_view_compactor.erl couch_view_group.erl couch_view_updater.erl

Author: fdmanana
Date: Thu Jun 23 09:45:39 2011
New Revision: 1138798

URL: http://svn.apache.org/viewvc?rev=1138798&view=rev
Log:
Merged revision 1138796 from trunk

    Simpler and safer db open/closing in view group servers

    This makes the opening and closing of databases in the view
    group server to be more friendly with the db reference counting
    system, avoiding more potential db file leaking after compaction,
    as we currently open a database in one process and use it on
    another process (view compactor, view updater).

    This significantly reduces the chances of failure when compacting
    very large views as discussed in COUCHDB-994.

    This relates to COUCHDB-926 and COUCHDB-994.


Modified:
    couchdb/branches/1.1.x/src/couchdb/couch_db.hrl
    couchdb/branches/1.1.x/src/couchdb/couch_view_compactor.erl
    couchdb/branches/1.1.x/src/couchdb/couch_view_group.erl
    couchdb/branches/1.1.x/src/couchdb/couch_view_updater.erl

Modified: couchdb/branches/1.1.x/src/couchdb/couch_db.hrl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.1.x/src/couchdb/couch_db.hrl?rev=1138798&r1=1138797&r2=1138798&view=diff
==============================================================================
--- couchdb/branches/1.1.x/src/couchdb/couch_db.hrl (original)
+++ couchdb/branches/1.1.x/src/couchdb/couch_db.hrl Thu Jun 23 09:45:39 2011
@@ -206,7 +206,6 @@
 
 -record(group, {
     sig=nil,
-    db=nil,
     fd=nil,
     name,
     def_lang,

Modified: couchdb/branches/1.1.x/src/couchdb/couch_view_compactor.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.1.x/src/couchdb/couch_view_compactor.erl?rev=1138798&r1=1138797&r2=1138798&view=diff
==============================================================================
--- couchdb/branches/1.1.x/src/couchdb/couch_view_compactor.erl (original)
+++ couchdb/branches/1.1.x/src/couchdb/couch_view_compactor.erl Thu Jun 23 09:45:39 2011
@@ -20,14 +20,14 @@
 %% @doc Compacts the views.  GroupId must not include the _design/ prefix
 start_compact(DbName, GroupId) ->
     Pid = couch_view:get_group_server(DbName, <<"_design/",GroupId/binary>>),
-    gen_server:cast(Pid, {start_compact, fun compact_group/2}).
+    gen_server:cast(Pid, {start_compact, fun compact_group/3}).
 
 %%=============================================================================
 %% internal functions
 %%=============================================================================
 
 %% @spec compact_group(Group, NewGroup) -> ok
-compact_group(Group, EmptyGroup) ->
+compact_group(Group, EmptyGroup, DbName) ->
     #group{
         current_seq = Seq,
         id_btree = IdBtree,
@@ -36,15 +36,15 @@ compact_group(Group, EmptyGroup) ->
     } = Group,
 
     #group{
-        db = Db,
         id_btree = EmptyIdBtree,
         views = EmptyViews
     } = EmptyGroup,
 
+    {ok, Db} = couch_db:open_int(DbName, []),
     {ok, {Count, _}} = couch_btree:full_reduce(Db#db.fulldocinfo_by_id_btree),
+    couch_db:close(Db),
 
     <<"_design", ShortName/binary>> = GroupId,
-    DbName = couch_db:name(Db),
     TaskName = <<DbName/binary, ShortName/binary>>,
     couch_task_status:add_task(<<"View Group Compaction">>, TaskName, <<"">>),
 

Modified: couchdb/branches/1.1.x/src/couchdb/couch_view_group.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.1.x/src/couchdb/couch_view_group.erl?rev=1138798&r1=1138797&r2=1138798&view=diff
==============================================================================
--- couchdb/branches/1.1.x/src/couchdb/couch_view_group.erl (original)
+++ couchdb/branches/1.1.x/src/couchdb/couch_view_group.erl Thu Jun 23 09:45:39 2011
@@ -78,7 +78,7 @@ start_link(InitArgs) ->
 init({{_, DbName, _} = InitArgs, ReturnPid, Ref}) ->
     process_flag(trap_exit, true),
     try prepare_group(InitArgs, false) of
-    {ok, #group{db=Db, fd=Fd, current_seq=Seq}=Group} ->
+    {ok, Db, #group{fd=Fd, current_seq=Seq}=Group} ->
         case Seq > couch_db:get_update_seq(Db) of
         true ->
             ReturnPid ! {Ref, self(), {error, invalid_view_seq}},
@@ -90,7 +90,7 @@ init({{_, DbName, _} = InitArgs, ReturnP
             {ok, #group_state{
                     db_name=DbName,
                     init_args=InitArgs,
-                    group=Group#group{db=nil},
+                    group=Group,
                     ref_counter=RefCounter}}
         end;
     Error ->
@@ -124,14 +124,11 @@ handle_call({request_group, RequestSeq},
             updater_pid=nil,
             waiting_list=WaitList
             }=State) when RequestSeq > Seq ->
-    {ok, Db} = couch_db:open_int(DbName, []),
-    Group2 = Group#group{db=Db},
     Owner = self(),
-    Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group2) end),
+    Pid = spawn_link(fun()-> couch_view_updater:update(Owner, Group, DbName) end),
 
     {noreply, State#group_state{
         updater_pid=Pid,
-        group=Group2,
         waiting_list=[{From,RequestSeq}|WaitList]
         }, infinity};
 
@@ -166,7 +163,8 @@ handle_cast({start_compact, CompactFun},
     {ok, Db} = couch_db:open_int(DbName, []),
     {ok, Fd} = open_index_file(compact, RootDir, DbName, GroupSig),
     NewGroup = reset_file(Db, Fd, DbName, Group),
-    Pid = spawn_link(fun() -> CompactFun(Group, NewGroup) end),
+    couch_db:close(Db),
+    Pid = spawn_link(fun() -> CompactFun(Group, NewGroup, DbName) end),
     {noreply, State#group_state{compactor_pid = Pid}};
 handle_cast({start_compact, _}, State) ->
     %% compact already running, this is a no-op
@@ -176,7 +174,7 @@ handle_cast({compact_done, #group{curren
         #group_state{group = #group{current_seq=OldSeq}} = State)
         when NewSeq >= OldSeq ->
     #group_state{
-        group = #group{name=GroupId, fd=OldFd, sig=GroupSig} = Group,
+        group = #group{name=GroupId, fd=OldFd, sig=GroupSig},
         init_args = {RootDir, DbName, _},
         updater_pid = UpdaterPid,
         compactor_pid = CompactorPid,
@@ -195,7 +193,7 @@ handle_cast({compact_done, #group{curren
         unlink(UpdaterPid),
         exit(UpdaterPid, view_compaction_complete),
         Owner = self(),
-        spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup) end);
+        spawn_link(fun()-> couch_view_updater:update(Owner, NewGroup, DbName) end);
     true ->
         nil
     end,
@@ -206,19 +204,10 @@ handle_cast({compact_done, #group{curren
     unlink(OldFd),
     couch_ref_counter:drop(RefCounter),
     {ok, NewRefCounter} = couch_ref_counter:start([NewGroup#group.fd]),
-    case Group#group.db of
-        nil -> ok;
-        Else -> couch_db:close(Else)
-    end,
-
-    case NewGroup#group.db of
-        nil -> ok;
-        _ -> couch_db:close(NewGroup#group.db)
-    end,
 
     self() ! delayed_commit,
     {noreply, State#group_state{
-        group=NewGroup#group{db = nil},
+        group=NewGroup,
         ref_counter=NewRefCounter,
         compactor_pid=nil,
         updater_pid=NewUpdaterPid
@@ -230,18 +219,15 @@ handle_cast({compact_done, NewGroup}, St
     } = State,
     ?LOG_INFO("View index compaction still behind for ~s ~s -- current: ~p " ++
         "compact: ~p", [DbName, GroupId, CurrentSeq, NewGroup#group.current_seq]),
-    couch_db:close(NewGroup#group.db),
     Pid = spawn_link(fun() ->
-        {ok, Db} = couch_db:open_int(DbName, []),
         {_,Ref} = erlang:spawn_monitor(fun() ->
-            couch_view_updater:update(nil, NewGroup#group{db = Db})
+            couch_view_updater:update(nil, NewGroup, DbName)
         end),
         receive
             {'DOWN', Ref, _, _, {new_group, NewGroup2}} ->
-                couch_db:close(Db),
                 #group{name=GroupId} = NewGroup2,
                 Pid2 = couch_view:get_group_server(DbName, GroupId),
-                gen_server:cast(Pid2, {compact_done, NewGroup2#group{db = nil}})
+                gen_server:cast(Pid2, {compact_done, NewGroup2})
         end
     end),
     {noreply, State#group_state{compactor_pid = Pid}};
@@ -283,13 +269,12 @@ handle_info(delayed_commit, #group_state
         {noreply, State#group_state{waiting_commit=true}}
     end;
 
-handle_info({'EXIT', FromPid, {new_group, #group{db=Db}=Group}},
+handle_info({'EXIT', FromPid, {new_group, Group}},
         #group_state{db_name=DbName,
             updater_pid=UpPid,
             ref_counter=RefCounter,
             waiting_list=WaitList,
             waiting_commit=WaitingCommit}=State) when UpPid == FromPid ->
-    ok = couch_db:close(Db),
     if not WaitingCommit ->
         erlang:send_after(1000, self(), delayed_commit);
     true -> ok
@@ -297,30 +282,27 @@ handle_info({'EXIT', FromPid, {new_group
     case reply_with_group(Group, WaitList, [], RefCounter) of
     [] ->
         {noreply, State#group_state{waiting_commit=true, waiting_list=[],
-                group=Group#group{db=nil}, updater_pid=nil}};
+                group=Group, updater_pid=nil}};
     StillWaiting ->
         % we still have some waiters, reopen the database and reupdate the index
-        {ok, Db2} = couch_db:open_int(DbName, []),
-        Group2 = Group#group{db=Db2},
         Owner = self(),
-        Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group2) end),
+        Pid = spawn_link(fun() -> couch_view_updater:update(Owner, Group, DbName) end),
         {noreply, State#group_state{waiting_commit=true,
-                waiting_list=StillWaiting, group=Group2, updater_pid=Pid}}
+                waiting_list=StillWaiting, updater_pid=Pid}}
     end;
 handle_info({'EXIT', _, {new_group, _}}, State) ->
     %% message from an old (probably pre-compaction) updater; ignore
     {noreply, State};
 
-handle_info({'EXIT', FromPid, reset},
-        #group_state{
-            init_args=InitArgs,
-            updater_pid=UpPid,
-            group=Group}=State) when UpPid == FromPid ->
-    ok = couch_db:close(Group#group.db),
+handle_info({'EXIT', UpPid, reset},
+        #group_state{init_args=InitArgs, updater_pid=UpPid} = State) ->
     case prepare_group(InitArgs, true) of
-    {ok, ResetGroup} ->
+    {ok, Db, ResetGroup} ->
         Owner = self(),
-        Pid = spawn_link(fun()-> couch_view_updater:update(Owner, ResetGroup) end),
+        couch_db:close(Db),
+        Pid = spawn_link(fun() ->
+            couch_view_updater:update(Owner, ResetGroup, Db#db.name)
+        end),
         {noreply, State#group_state{
                 updater_pid=Pid,
                 group=ResetGroup}};
@@ -386,17 +368,17 @@ prepare_group({RootDir, DbName, #group{s
         {ok, Fd} ->
             if ForceReset ->
                 % this can happen if we missed a purge
-                {ok, reset_file(Db, Fd, DbName, Group)};
+                {ok, Db, reset_file(Db, Fd, DbName, Group)};
             true ->
                 % 09 UPGRADE CODE
                 ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>),
                 case (catch couch_file:read_header(Fd)) of
                 {ok, {Sig, HeaderInfo}} ->
                     % sigs match!
-                    {ok, init_group(Db, Fd, Group, HeaderInfo)};
+                    {ok, Db, init_group(Db, Fd, Group, HeaderInfo)};
                 _ ->
                     % this happens on a new file
-                    {ok, reset_file(Db, Fd, DbName, Group)}
+                    {ok, Db, reset_file(Db, Fd, DbName, Group)}
                 end
             end;
         Error ->
@@ -582,7 +564,7 @@ design_doc_to_view_group(#doc{id=Id,body
 
 reset_group(#group{views=Views}=Group) ->
     Views2 = [View#view{btree=nil} || View <- Views],
-    Group#group{db=nil,fd=nil,query_server=nil,current_seq=0,
+    Group#group{fd=nil,query_server=nil,current_seq=0,
             id_btree=nil,views=Views2}.
 
 reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
@@ -598,7 +580,7 @@ init_group(Db, Fd, #group{views=Views}=G
     init_group(Db, Fd, Group,
         #index_header{seq=0, purge_seq=couch_db:get_purge_seq(Db),
             id_btree_state=nil, view_states=[{nil, 0, 0} || _ <- Views]});
-init_group(Db, Fd, #group{def_lang=Lang,views=Views}=
+init_group(_Db, Fd, #group{def_lang=Lang,views=Views}=
             Group, IndexHeader) ->
      #index_header{seq=Seq, purge_seq=PurgeSeq,
             id_btree_state=IdBtreeState, view_states=ViewStates} = IndexHeader,
@@ -638,5 +620,5 @@ init_group(Db, Fd, #group{def_lang=Lang,
             View#view{btree=Btree, update_seq=USeq, purge_seq=PSeq}
         end,
         ViewStates2, Views),
-    Group#group{db=Db, fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
+    Group#group{fd=Fd, current_seq=Seq, purge_seq=PurgeSeq,
         id_btree=IdBtree, views=Views2}.

Modified: couchdb/branches/1.1.x/src/couchdb/couch_view_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/1.1.x/src/couchdb/couch_view_updater.erl?rev=1138798&r1=1138797&r2=1138798&view=diff
==============================================================================
--- couchdb/branches/1.1.x/src/couchdb/couch_view_updater.erl (original)
+++ couchdb/branches/1.1.x/src/couchdb/couch_view_updater.erl Thu Jun 23 09:45:39 2011
@@ -12,30 +12,31 @@
 
 -module(couch_view_updater).
 
--export([update/2]).
+-export([update/3]).
 
 -include("couch_db.hrl").
 
--spec update(_, #group{}) -> no_return().
+-spec update(_, #group{}, Dbname::binary()) -> no_return().
 
-update(Owner, Group) ->
+update(Owner, Group, DbName) ->
     #group{
-        db = #db{name=DbName} = Db,
         name = GroupName,
         current_seq = Seq,
         purge_seq = PurgeSeq
     } = Group,
     couch_task_status:add_task(<<"View Group Indexer">>, <<DbName/binary," ",GroupName/binary>>, <<"Starting index update">>),
 
+    {ok, Db} = couch_db:open_int(DbName, []),
     DbPurgeSeq = couch_db:get_purge_seq(Db),
     Group2 =
     if DbPurgeSeq == PurgeSeq ->
         Group;
     DbPurgeSeq == PurgeSeq + 1 ->
         couch_task_status:update(<<"Removing purged entries from view index.">>),
-        purge_index(Group);
+        purge_index(Group, Db);
     true ->
         couch_task_status:update(<<"Resetting view index due to lost purge entries.">>),
+        couch_db:close(Db),
         exit(reset)
     end,
     {ok, MapQueue} = couch_work_queue:new(
@@ -73,13 +74,14 @@ update(Owner, Group) ->
     couch_task_status:set_update_frequency(0),
     couch_task_status:update("Finishing."),
     couch_work_queue:close(MapQueue),
+    couch_db:close(Db),
     receive {new_group, NewGroup} ->
         exit({new_group,
                 NewGroup#group{current_seq=couch_db:get_update_seq(Db)}})
     end.
 
 
-purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
+purge_index(#group{views=Views, id_btree=IdBtree}=Group, Db) ->
     {ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
     Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],
     {ok, Lookups, IdBtree2} = couch_btree:query_modify(IdBtree, Ids, [], Ids),