You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2008/08/05 03:43:41 UTC

svn commit: r682560 - in /incubator/couchdb/trunk/src/couchdb: couch_db.erl couch_db.hrl couch_db_updater.erl couch_file.erl couch_httpd.erl couch_rep.erl couch_server.erl couch_server_sup.erl couch_view.erl

Author: damien
Date: Mon Aug  4 18:43:40 2008
New Revision: 682560

URL: http://svn.apache.org/viewvc?rev=682560&view=rev
Log:
Added concurrent open db limit and a LRU cache for closing old databases when limit reached (configurable via MaxDbsOpen var in couch.ini). Refactored db update code in couch_db.erl into couch_db_updater.erl.

Added:
    incubator/couchdb/trunk/src/couchdb/couch_db_updater.erl
Modified:
    incubator/couchdb/trunk/src/couchdb/couch_db.erl
    incubator/couchdb/trunk/src/couchdb/couch_db.hrl
    incubator/couchdb/trunk/src/couchdb/couch_file.erl
    incubator/couchdb/trunk/src/couchdb/couch_httpd.erl
    incubator/couchdb/trunk/src/couchdb/couch_rep.erl
    incubator/couchdb/trunk/src/couchdb/couch_server.erl
    incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl
    incubator/couchdb/trunk/src/couchdb/couch_view.erl

Modified: incubator/couchdb/trunk/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_db.erl?rev=682560&r1=682559&r2=682560&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_db.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_db.erl Mon Aug  4 18:43:40 2008
@@ -13,63 +13,24 @@
 -module(couch_db).
 -behaviour(gen_server).
 
--export([open/2,create/2,create/3,get_doc_info/2,start_compact/1]).
--export([save_docs/2, save_docs/3, get_db_info/1, update_doc/3, update_docs/2, update_docs/3]).
--export([delete_doc/3,open_doc/2,open_doc/3,enum_docs_since/4,enum_docs_since/5]).
--export([enum_docs/4,enum_docs/5, open_doc_revs/4, get_missing_revs/2]).
+-export([open/2,close/1,create/2,start_compact/1,get_db_info/1]).
+-export([open_ref_counted/2,num_refs/1,monitor/1]).
+-export([save_docs/3,update_doc/3,update_docs/2,update_docs/3,delete_doc/3]).
+-export([get_doc_info/2,open_doc/2,open_doc/3,open_doc_revs/4]).
+-export([get_missing_revs/2]).
+-export([enum_docs/4,enum_docs/5,enum_docs_since/4,enum_docs_since/5]).
 -export([enum_docs_since_reduce_to_count/1,enum_docs_reduce_to_count/1]).
 -export([increment_update_seq/1]).
--export([start_update_loop/2]).
+-export([start_link/3]).
 -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
--export([start_copy_compact_int/2]).
 
--export([btree_by_id_split/1,
-            btree_by_id_join/2,
-            btree_by_id_reduce/2,
-            btree_by_seq_split/1,
-            btree_by_seq_join/2,
-            btree_by_seq_reduce/2]).
 
 -include("couch_db.hrl").
 
--record(db_header,
-    {write_version = 0,
-     update_seq = 0,
-     summary_stream_state = nil,
-     fulldocinfo_by_id_btree_state = nil,
-     docinfo_by_seq_btree_state = nil,
-     local_docs_btree_state = nil,
-     doc_count=0,
-     doc_del_count=0
-    }).
-
--record(db,
-    {main_pid=nil,
-    update_pid=nil,
-    compactor_pid=nil,
-    fd,
-    header = #db_header{},
-    summary_stream,
-    fulldocinfo_by_id_btree,
-    docinfo_by_seq_btree,
-    local_docs_btree,
-    update_seq,
-    doc_count,
-    doc_del_count,
-    name,
-    filepath
-    }).
-
-% small value used in revision trees to indicate the revision isn't stored
--define(REV_MISSING, []).
-
--define(HEADER_SIG, <<$g, $m, $k, 0>>).
-
 start_link(DbName, Filepath, Options) ->
     catch start_link0(DbName, Filepath, Options).
         
 start_link0(DbName, Filepath, Options) ->
-     % first delete the old file previous compaction
     Fd = 
     case couch_file:open(Filepath, Options) of
     {ok, Fd0} ->
@@ -105,33 +66,38 @@
     end,
     StartResult.
 
-%%% Interface functions %%%
 
-create(Filepath, Options) ->
-    create(Filepath, Filepath, Options).
+create(DbName, Options) ->
+    couch_server:create(DbName, Options).
+
+open(DbName, Options) ->
+    couch_server:open(DbName, Options).
 
-create(DbName, Filepath, Options) when is_list(Options) ->
-    start_link(DbName, Filepath, [create | Options]).
+close(#db{fd=Fd}) ->
+    couch_file:drop_ref(Fd).
 
-open(DbName, Filepath) ->
-    start_link(DbName, Filepath, []).
+open_ref_counted(MainPid, OpeningPid) ->
+    gen_server:call(MainPid, {open_ref_counted_instance, OpeningPid}).
 
+num_refs(MainPid) ->
+    gen_server:call(MainPid, num_refs).
 
-% Compaction still needs work. Right now readers and writers can get an error 
-% file compaction changeover. This doesn't need to be the case.
-start_compact(MainPid) ->
-    gen_server:cast(MainPid, start_compact).
+monitor(#db{main_pid=MainPid}) ->
+    erlang:monitor(process, MainPid).
 
-delete_doc(MainPid, Id, Revisions) ->
+start_compact(#db{update_pid=Pid}) ->
+    gen_server:cast(Pid, start_compact).
+
+delete_doc(Db, Id, Revisions) ->
     DeletedDocs = [#doc{id=Id, revs=[Rev], deleted=true} || Rev <- Revisions],
-    {ok, [Result]} = update_docs(MainPid, DeletedDocs, []),
+    {ok, [Result]} = update_docs(Db, DeletedDocs, []),
     {ok, Result}.
 
-open_doc(MainPid, IdOrDocInfo) ->
-    open_doc(MainPid, IdOrDocInfo, []).
+open_doc(Db, IdOrDocInfo) ->
+    open_doc(Db, IdOrDocInfo, []).
 
-open_doc(MainPid, Id, Options) ->
-    case open_doc_int(get_db(MainPid), Id, Options) of
+open_doc(Db, Id, Options) ->
+    case open_doc_int(Db, Id, Options) of
     {ok, #doc{deleted=true}=Doc} ->
         case lists:member(deleted, Options) of
         true ->
@@ -143,13 +109,13 @@
         Else
     end.
 
-open_doc_revs(MainPid, Id, Revs, Options) ->
-    [Result] = open_doc_revs_int(get_db(MainPid), [{Id, Revs}], Options),
+open_doc_revs(Db, Id, Revs, Options) ->
+    [Result] = open_doc_revs_int(Db, [{Id, Revs}], Options),
     Result.
 
-get_missing_revs(MainPid, IdRevsList) ->
+get_missing_revs(Db, IdRevsList) ->
     Ids = [Id1 || {Id1, _Revs} <- IdRevsList],
-    FullDocInfoResults = get_full_doc_infos(MainPid, Ids),
+    FullDocInfoResults = get_full_doc_infos(Db, Ids),
     Results = lists:zipwith(
         fun({Id, Revs}, FullDocInfoResult) ->
             case FullDocInfoResult of
@@ -177,18 +143,12 @@
     [Result] = get_full_doc_infos(Db, [Id]),
     Result.
 
-
-get_full_doc_infos(MainPid, Ids) when is_pid(MainPid) ->
-    get_full_doc_infos(get_db(MainPid), Ids);
-get_full_doc_infos(#db{}=Db, Ids) ->
+get_full_doc_infos(Db, Ids) ->
     couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids).
 
-increment_update_seq(MainPid) ->
-    gen_server:call(MainPid, increment_update_seq).
-        
+increment_update_seq(#db{update_pid=UpdatePid}) ->
+    gen_server:call(UpdatePid, increment_update_seq).
         
-get_db_info(MainPid) when is_pid(MainPid) ->
-    get_db_info(get_db(MainPid));
 get_db_info(Db) ->
     #db{fd=Fd,
         compactor_pid=Compactor,
@@ -205,12 +165,12 @@
         ],
     {ok, InfoList}.
 
-update_doc(MainPid, Doc, Options) ->
-    {ok, [NewRev]} = update_docs(MainPid, [Doc], Options),
+update_doc(Db, Doc, Options) ->
+    {ok, [NewRev]} = update_docs(Db, [Doc], Options),
     {ok, NewRev}.
 
-update_docs(MainPid, Docs) ->
-    update_docs(MainPid, Docs, []).
+update_docs(Db, Docs) ->
+    update_docs(Db, Docs, []).
     
 % group_alike_docs groups the sorted documents into sublist buckets, by id.
 % ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
@@ -263,7 +223,7 @@
         end
     end.
 
-update_docs(MainPid, Docs, Options) ->
+update_docs(#db{update_pid=UpdatePid}=Db, Docs, Options) ->
     % go ahead and generate the new revision ids for the documents.
     Docs2 = lists:map(
         fun(#doc{id=Id,revs=Revs}=Doc) ->
@@ -278,7 +238,6 @@
     NewRevs = [NewRev || #doc{revs=[NewRev|_]} <- Docs2],
     DocBuckets = group_alike_docs(Docs2),
     Ids = [Id || [#doc{id=Id}|_] <- DocBuckets],
-    Db = get_db(MainPid),
     
     % lookup the doc by id and get the most recent
     
@@ -298,13 +257,14 @@
     % flush unwritten binaries to disk.
     DocBuckets3 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets2],
 
-    case gen_server:call(MainPid, {update_docs, DocBuckets3, [new_edits | Options]}, infinity) of
+    case gen_server:call(UpdatePid, {update_docs, DocBuckets3, [new_edits | Options]}, infinity) of
     ok -> {ok, NewRevs};
     retry ->
-        Db2 = get_db(MainPid),
+        Db2 = open_ref_counted(Db#db.main_pid, self()),
         DocBuckets4 = [[doc_flush_binaries(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets3],
         % We only retry once
-        case gen_server:call(MainPid, {update_docs, DocBuckets4, [new_edits | Options]}, infinity) of
+        ok = close(Db2),
+        case gen_server:call(UpdatePid, {update_docs, DocBuckets4, [new_edits | Options]}, infinity) of
         ok -> {ok, NewRevs};
         Else -> throw(Else)
         end;
@@ -312,15 +272,11 @@
         throw(Else)
     end.
 
-save_docs(MainPid, Docs) ->
-    save_docs(MainPid, Docs, []).
-
-save_docs(MainPid, Docs, Options) ->
+save_docs(#db{update_pid=UpdatePid, fd=Fd}, Docs, Options) ->
     % flush unwritten binaries to disk.
-    Db = get_db(MainPid),
     DocBuckets = group_alike_docs(Docs),
-    DocBuckets2 = [[doc_flush_binaries(Doc, Db#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
-    ok = gen_server:call(MainPid, {update_docs, DocBuckets2, Options}, infinity).
+    DocBuckets2 = [[doc_flush_binaries(Doc, Fd) || Doc <- Bucket] || Bucket <- DocBuckets],
+    ok = gen_server:call(UpdatePid, {update_docs, DocBuckets2, Options}, infinity).
 
 
 doc_flush_binaries(Doc, Fd) ->
@@ -379,125 +335,51 @@
     Doc#doc{attachments = NewBins}.
 
 enum_docs_since_reduce_to_count(Reds) ->
-    couch_btree:final_reduce(fun btree_by_seq_reduce/2, Reds).
+    couch_btree:final_reduce(fun couch_db_updater:btree_by_seq_reduce/2, Reds).
 
 enum_docs_reduce_to_count(Reds) ->
-    couch_btree:final_reduce(fun btree_by_id_reduce/2, Reds).
+    couch_btree:final_reduce(fun couch_db_updater:btree_by_id_reduce/2, Reds).
 
-enum_docs_since(MainPid, SinceSeq, Direction, InFun, Ctx) ->
-    Db = get_db(MainPid),
+enum_docs_since(Db, SinceSeq, Direction, InFun, Ctx) ->
     couch_btree:fold(Db#db.docinfo_by_seq_btree, SinceSeq + 1, Direction, InFun, Ctx).
 
-enum_docs_since(MainPid, SinceSeq, InFun, Acc) ->
-    enum_docs_since(MainPid, SinceSeq, fwd, InFun, Acc).
+enum_docs_since(Db, SinceSeq, InFun, Acc) ->
+    enum_docs_since(Db, SinceSeq, fwd, InFun, Acc).
 
-enum_docs(MainPid, StartId, Direction, InFun, InAcc) ->
-    Db = get_db(MainPid),
+enum_docs(Db, StartId, Direction, InFun, InAcc) ->
     couch_btree:fold(Db#db.fulldocinfo_by_id_btree, StartId, Direction, InFun, InAcc).
 
-enum_docs(MainPid, StartId, InFun, Ctx) ->
-    enum_docs(MainPid, StartId, fwd, InFun, Ctx).
+enum_docs(Db, StartId, InFun, Ctx) ->
+    enum_docs(Db, StartId, fwd, InFun, Ctx).
 
 % server functions
 
-init(InitArgs) ->
-    spawn_link(couch_db, start_update_loop, [self(), InitArgs]),
-    receive
-    {initialized, Db} ->
-        {ok, Db}
-    end.
-
-btree_by_seq_split(DocInfo) ->
-    #doc_info{
-        id = Id,
-        rev = Rev,
-        update_seq = Seq,
-        summary_pointer = Sp,
-        conflict_revs = Conflicts,
-        deleted_conflict_revs = DelConflicts,
-        deleted = Deleted} = DocInfo,
-    {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}.
-    
-btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) ->
-    #doc_info{
-        id = Id,
-        rev = Rev,
-        update_seq = Seq,
-        summary_pointer = Sp,
-        conflict_revs = Conflicts,
-        deleted_conflict_revs = DelConflicts,
-        deleted = Deleted}.
-
-btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
-        deleted=Deleted, rev_tree=Tree}) ->
-    {Id, {Seq, case Deleted of true -> 1; false-> 0 end, Tree}}.
+init({DbName, Filepath, Fd, Options}) ->
+    {ok, UpdaterPid} = gen_server:start_link(couch_db_updater, {self(), DbName, Filepath, Fd, Options}, []),
+    ok = couch_file:add_ref(Fd),
+    gen_server:call(UpdaterPid, get_db).
 
-btree_by_id_join(Id, {Seq, Deleted, Tree}) ->
-    #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}.
-    
-
-
-btree_by_id_reduce(reduce, FullDocInfos) ->
-    % count the number of deleted documents
-    length([1 || #full_doc_info{deleted=false} <- FullDocInfos]);
-btree_by_id_reduce(rereduce, Reds) ->
-    lists:sum(Reds).
-            
-btree_by_seq_reduce(reduce, DocInfos) ->
-    % count the number of deleted documents
-    length(DocInfos);
-btree_by_seq_reduce(rereduce, Reds) ->
-    lists:sum(Reds).
-
-init_db(DbName, Filepath, Fd, Header) ->
-    {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
-    ok = couch_stream:set_min_buffer(SummaryStream, 10000),
-    {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
-        [{split, fun btree_by_id_split/1},
-        {join, fun btree_by_id_join/2},
-        {reduce, fun btree_by_id_reduce/2}]),
-    {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
-            [{split, fun btree_by_seq_split/1},
-            {join, fun btree_by_seq_join/2},
-            {reduce, fun btree_by_seq_reduce/2}]),
-    {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
-
-    #db{
-        update_pid=self(),
-        fd=Fd,
-        header=Header,
-        summary_stream = SummaryStream,
-        fulldocinfo_by_id_btree = IdBtree,
-        docinfo_by_seq_btree = SeqBtree,
-        local_docs_btree = LocalDocsBtree,
-        update_seq = Header#db_header.update_seq,
-        doc_count = Header#db_header.doc_count,
-        doc_del_count = Header#db_header.doc_del_count,
-        name = DbName,
-        filepath=Filepath }.
-
-close_db(#db{fd=Fd,summary_stream=Ss}) ->
-    couch_file:close(Fd),
-    couch_stream:close(Ss).
-    
 terminate(_Reason, Db) ->
     exit(Db#db.update_pid, kill).
     
-handle_call({update_docs, DocActions, Options}, From, #db{update_pid=Updater}=Db) ->
-    Updater ! {From, update_docs, DocActions, Options},
-    {noreply, Db};
-handle_call(increment_update_seq, From, #db{update_pid=Updater}=Db) ->
-    Updater ! {From, increment_update_seq},
-    {noreply, Db};
-handle_call(get_db, _From, Db) ->
+handle_call({open_ref_counted_instance, OpenerPid}, _From, #db{fd=Fd}=Db) ->
+    ok = couch_file:add_ref(Fd, OpenerPid),
     {reply, {ok, Db}, Db};
-handle_call({db_updated, NewDb}, _From, _OldDb) ->
+handle_call(num_refs, _From, #db{fd=Fd}=Db) ->
+    {reply, couch_file:num_refs(Fd) - 1, Db};
+handle_call({db_updated, #db{fd=NewFd}=NewDb}, _From, #db{fd=OldFd}) ->
+    case NewFd == OldFd of
+    true -> ok;
+    false ->
+        couch_file:add_ref(NewFd),
+        couch_file:drop_ref(OldFd)
+    end,
     {reply, ok, NewDb}.
 
 
-handle_cast(start_compact, #db{update_pid=Updater}=Db) ->
-    Updater ! compact,
-    {noreply, Db}.
+handle_cast(Msg, Db) ->
+    ?LOG_ERROR("Bad cast message received for db ~s: ~p", [Db#db.name, Msg]),
+    exit({error, Msg}).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -508,114 +390,6 @@
 
 
 %%% Internal function %%%
-
-start_update_loop(MainPid, {DbName, Filepath, Fd, Options}) ->
-    link(Fd),
-    
-    case lists:member(create, Options) of
-    true ->
-        % create a new header and writes it to the file
-        Header =  #db_header{},
-        ok = couch_file:write_header(Fd, ?HEADER_SIG, Header),
-        % delete any old compaction files that might be hanging around
-        file:delete(Filepath ++ ".compact"),
-        file:delete(Filepath ++ ".old");
-    false ->
-        {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG)
-    end,
-    
-    Db = init_db(DbName, Filepath, Fd, Header),
-    Db2 = Db#db{main_pid=MainPid},
-    MainPid ! {initialized, Db2},
-    update_loop(Db2).
-    
-update_loop(#db{fd=Fd,name=Name,
-            filepath=Filepath,
-            main_pid=MainPid,
-            update_seq=UpdateSeq}=Db) ->
-    receive
-    {OrigFrom, update_docs, DocActions, Options} ->
-        case (catch update_docs_int(Db, DocActions, Options)) of
-        {ok, Db2} ->
-            ok = gen_server:call(MainPid, {db_updated, Db2}),
-            gen_server:reply(OrigFrom, ok),
-            couch_db_update_notifier:notify({updated, Name}),
-            update_loop(Db2);
-        retry ->
-            gen_server:reply(OrigFrom, retry),
-            update_loop(Db);
-        conflict ->
-            gen_server:reply(OrigFrom, conflict),
-            update_loop(Db);
-        Error ->
-            exit(Error) % we crashed
-        end;
-    compact ->
-        case Db#db.compactor_pid of
-        nil ->
-            ?LOG_INFO("Starting compaction for db \"~s\"", [Name]),
-            Pid = spawn_link(couch_db, start_copy_compact_int, [Db, true]),
-            Db2 = Db#db{compactor_pid=Pid},
-            ok = gen_server:call(MainPid, {db_updated, Db2}),
-            update_loop(Db2);
-        _ ->
-            update_loop(Db) % already started
-        end;
-    {compact_done, CompactFilepath} ->
-        {ok, NewFd} = couch_file:open(CompactFilepath),
-        {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG),
-        #db{update_seq=NewSeq}= NewDb =
-                init_db(Name, CompactFilepath, NewFd, NewHeader),
-        case Db#db.update_seq == NewSeq of
-        true ->
-            NewDb2 = commit_data(
-                NewDb#db{
-                    main_pid = Db#db.main_pid,
-                    doc_count = Db#db.doc_count,
-                    doc_del_count = Db#db.doc_del_count,
-                    filepath = Filepath}),
-                
-            ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]),
-            ok = file:rename(Filepath, Filepath ++ ".old"),
-            ok = file:rename(CompactFilepath, Filepath),
-            
-            couch_stream:close(Db#db.summary_stream),
-            % close file handle async.
-            % wait 5 secs before closing, allowing readers to finish
-            unlink(Fd),
-            spawn_link(fun() ->
-                receive after 5000 -> ok end,
-                couch_file:close(Fd),
-                file:delete(Filepath ++ ".old")
-                end),
-                
-            ok = gen_server:call(MainPid, {db_updated, NewDb2}),
-            ?LOG_INFO("Compaction for db ~p completed.", [Name]),
-            update_loop(NewDb2#db{compactor_pid=nil});
-        false ->
-            ?LOG_INFO("Compaction file still behind main file "
-                "(update seq=~p. compact update seq=~p). Retrying.",
-                [Db#db.update_seq, NewSeq]),
-            Pid = spawn_link(couch_db, start_copy_compact_int, [Db, false]),
-            Db2 = Db#db{compactor_pid=Pid},
-            couch_file:close(NewFd),
-            update_loop(Db2)
-        end;
-    {OrigFrom, increment_update_seq} ->
-        Db2 = commit_data(Db#db{update_seq=UpdateSeq+1}),
-        ok = gen_server:call(MainPid, {db_updated, Db2}),
-        gen_server:reply(OrigFrom, {ok, UpdateSeq+1}),
-        couch_db_update_notifier:notify({updated, Name}),
-        update_loop(Db2);
-    Else ->
-        ?LOG_ERROR("Unknown message received in db ~s:~p", [Db#db.name, Else]),
-        exit({error, Else})
-    end.
-
-get_db(MainPid) ->
-    {ok, Db} = gen_server:call(MainPid, get_db),
-    Db.
-
 open_doc_revs_int(Db, IdRevs, Options) ->
     Ids = [Id || {Id, _Revs} <- IdRevs],
     LookupResults = get_full_doc_infos(Db, Ids),
@@ -711,16 +485,6 @@
         end
     end.
 
-% rev tree functions
-
-doc_to_tree(Doc) ->
-    doc_to_tree(Doc, lists:reverse(Doc#doc.revs)).
-
-doc_to_tree(Doc, [RevId]) ->
-    [{RevId, Doc, []}];
-doc_to_tree(Doc, [RevId | Rest]) ->
-    [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}].
-
 make_doc(Db, Id, Deleted, SummaryPointer, RevisionPath) ->
     {BodyData, BinValues} =
     case SummaryPointer of
@@ -737,303 +501,6 @@
         attachments = BinValues,
         deleted = Deleted
         }.
-
-flush_trees(_Db, [], AccFlushedTrees) ->
-    {ok, lists:reverse(AccFlushedTrees)};
-flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) ->
-        #full_doc_info{rev_tree=Unflushed} = InfoUnflushed,
-        Flushed = couch_key_tree:map(
-        fun(_Rev, Value) ->
-            case Value of
-            #doc{attachments=Atts,deleted=IsDeleted}=Doc ->
-                % this node value is actually an unwritten document summary,
-                % write to disk.
-                % make sure the Fd in the written bins is the same Fd we are.
-                Bins =
-                case Atts of
-                [] -> [];
-                [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd ->
-                    % convert bins, removing the FD.
-                    % All bins should have been flushed to disk already.
-                    [{BinName, {BinType, BinSp, BinLen}}
-                        || {BinName, {BinType, {_Fd, BinSp, BinLen}}}
-                        <- Atts];
-                _ ->
-                    % BinFd must not equal our Fd. This can happen when a database
-                    % is being updated during a compaction
-                    ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []),
-                    throw(retry)
-                end,
-                {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
-                {IsDeleted, NewSummaryPointer};
-            _ ->
-                Value
-            end
-        end, Unflushed),
-    flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
-
-merge_rev_trees(_NoConflicts, [], [], AccNewInfos, AccSeq) ->
-    {ok, lists:reverse(AccNewInfos), AccSeq};
-merge_rev_trees(NoConflicts, [NewDocs|RestDocsList],
-        [OldDocInfo|RestOldInfo], AccNewInfos, AccSeq) ->
-    #full_doc_info{id=Id,rev_tree=OldTree}=OldDocInfo,
-    UpdatesRevTree = lists:foldl(
-        fun(NewDoc, AccTree) ->
-            couch_key_tree:merge(AccTree, doc_to_tree(NewDoc))
-        end,
-        [], NewDocs),
-    NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree),
-    if NewRevTree == OldTree ->
-        % nothing changed
-        merge_rev_trees(NoConflicts, RestDocsList, RestOldInfo, AccNewInfos, AccSeq);
-    true ->
-        if NoConflicts andalso OldTree /= [] ->
-            OldConflicts = couch_key_tree:count_leafs(OldTree),
-            NewConflicts = couch_key_tree:count_leafs(NewRevTree),
-            if NewConflicts > OldConflicts ->
-                throw(conflict);
-            true -> ok
-            end;
-        true -> ok
-        end,
-        NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
-        merge_rev_trees(NoConflicts, RestDocsList,RestOldInfo, 
-                [NewInfo|AccNewInfos],AccSeq+1)
-    end.
-
-new_index_entries([], DocCount, DelCount, AccById, AccBySeq) ->
-    {ok, DocCount, DelCount, AccById, AccBySeq};
-new_index_entries([FullDocInfo|RestInfos], DocCount, DelCount, AccById, AccBySeq) ->
-    #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo),
-    {DocCount2, DelCount2} =
-    if Deleted -> {DocCount, DelCount + 1};
-    true -> {DocCount + 1, DelCount} 
-    end,
-    new_index_entries(RestInfos, DocCount2, DelCount2, 
-        [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
-        [DocInfo|AccBySeq]).
-
-update_docs_int(Db, DocsList, Options) ->
-    #db{
-        fulldocinfo_by_id_btree = DocInfoByIdBTree,
-        docinfo_by_seq_btree = DocInfoBySeqBTree,
-        update_seq = LastSeq,
-        doc_count = FullDocCount,
-        doc_del_count = FullDelCount
-        } = Db,
-
-    % separate out the NonRep documents from the rest of the documents
-    {DocsList2, NonRepDocs} = lists:foldl(
-        fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) ->
-            case Id of
-            ?LOCAL_DOC_PREFIX ++ _ when Rest==[] ->
-                % when saving NR (non rep) documents, you can only save a single rev
-                {DocsListAcc, [Doc | NonRepDocsAcc]};
-            Id->
-                {[Docs | DocsListAcc], NonRepDocsAcc}
-            end
-        end, {[], []}, DocsList),
-    
-    Ids = [Id || [#doc{id=Id}|_] <- DocsList2], 
-    
-    % lookup up the existing documents, if they exist.
-    OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
-    OldDocInfos = lists:zipwith(
-        fun(_Id, {ok, FullDocInfo}) ->
-            FullDocInfo;
-        (Id, not_found) ->
-            #full_doc_info{id=Id}
-        end,
-        Ids, OldDocLookups),
-    
-    {OldCount, OldDelCount} = lists:foldl(
-        fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) ->
-            case couch_doc:to_doc_info(FullDocInfo) of
-            #doc_info{deleted=false} ->
-                {OldCountAcc + 1, OldDelCountAcc};
-            _ ->
-                {OldCountAcc, OldDelCountAcc + 1}
-            end;
-        (not_found, Acc) ->
-            Acc
-        end, {0, 0}, OldDocLookups),
-    
-    % Merge the new docs into the revision trees.
-    NoConflicts = lists:member(new_edits, Options),
-    {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq),
-    
-    RemoveSeqs =
-        [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups],
-    
-    % All regular documents are now ready to write.
-    
-    % Try to write the local documents first, a conflict might be generated
-    {ok, Db2}  = update_local_docs(Db, NonRepDocs),
-    
-    % Write out the documents summaries (they are stored in the nodes of the rev trees)
-    {ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []),
-    
-    {ok, NewDocsCount, NewDelCount, InfoById, InfoBySeq} =
-        new_index_entries(FlushedDocInfos, 0, 0, [], []),
-
-    % and the indexes to the documents
-    {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs),
-    {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []),
-
-    Db3 = Db2#db{
-        fulldocinfo_by_id_btree = DocInfoByIdBTree2,
-        docinfo_by_seq_btree = DocInfoBySeqBTree2,
-        update_seq = NewSeq,
-        doc_count = FullDocCount + NewDocsCount - OldCount,
-        doc_del_count = FullDelCount + NewDelCount - OldDelCount},
-
-    case lists:member(delay_commit, Options) of
-    true ->
-        {ok, Db3};
-    false ->
-        {ok, commit_data(Db3)}
-    end.
-
-update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
-    Ids = [Id || #doc{id=Id} <- Docs],
-    OldDocLookups = couch_btree:lookup(Btree, Ids),
-    BtreeEntries = lists:zipwith(
-        fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) ->
-            NewRev =
-            case Revs of
-                [] -> 0;
-                [RevStr|_] -> list_to_integer(RevStr)
-            end,
-            OldRev =
-            case OldDocLookup of
-                {ok, {_, {OldRev0, _}}} -> OldRev0;
-                not_found -> 0
-            end,
-            case OldRev + 1 == NewRev of
-            true ->
-                case Delete of
-                    false -> {update, {Id, {NewRev, Body}}};
-                    true  -> {remove, Id}
-                end;
-            false ->
-                throw(conflict)
-            end
-            
-        end, Docs, OldDocLookups),
-
-    BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
-    BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries],
-
-    {ok, Btree2} =
-        couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
-
-    {ok, Db#db{local_docs_btree = Btree2}}.
-
-
-
-commit_data(#db{fd=Fd, header=Header} = Db) ->
-    Header2 = Header#db_header{
-        update_seq = Db#db.update_seq,
-        summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
-        docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
-        fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
-        local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
-        doc_count = Db#db.doc_count,
-        doc_del_count = Db#db.doc_del_count
-        },
-    if Header == Header2 ->
-        Db; % unchanged. nothing to do
-    true ->
-        ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
-        Db#db{header = Header2}
-    end.
-
-copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
-    {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
-    % copy the bin values
-    NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
-        {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
-        {Name, {Type, NewBinSp, Len}}
-        end, BinInfos),
-    % now write the document summary
-    {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}),
-    Sp.
-
-copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
-    [];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) ->
-    % This is a leaf node, copy it over
-    NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
-    [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
-    % inner node, only copy info/data from leaf nodes
-    [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)].
-    
-copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) ->
-    Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
-    LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
-    NewFullDocInfos = lists:map(
-        fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
-            Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
-        end, LookupResults),
-    NewDocInfos = [couch_doc:to_doc_info(FullDocInfo) || FullDocInfo <- NewFullDocInfos],
-    {ok, DocInfoBTree} =
-        couch_btree:add_remove(NewDb#db.docinfo_by_seq_btree, NewDocInfos, []),
-    {ok, FullDocInfoBTree} =
-        couch_btree:add_remove(NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
-    NewDb#db{fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}.
-
-
-          
-copy_compact_docs(Db, NewDb) ->
-    EnumBySeqFun =
-    fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) ->
-        case couch_util:should_flush() of
-        true ->
-            NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied])),
-            {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}};
-        false ->    
-            {ok, {AccNewDb, [DocInfo | AccUncopied]}}
-        end
-    end,
-    {ok, {NewDb2, Uncopied}} =
-        couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, []}),
-
-    case Uncopied of
-    [#doc_info{update_seq=LastSeq} | _] ->
-        commit_data( copy_docs(Db, NewDb2#db{update_seq=LastSeq},
-            lists:reverse(Uncopied)));
-    [] ->
-        NewDb2
-    end.
-
-start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) ->
-    CompactFile = Filepath ++ ".compact",
-    ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
-    case couch_file:open(CompactFile) of
-    {ok, Fd} ->
-        ?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]),
-        {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
-    {error, enoent} -> %
-        {ok, Fd} = couch_file:open(CompactFile, [create]),
-        Header =  #db_header{},
-        ok = couch_file:write_header(Fd, ?HEADER_SIG, Header)
-    end,
-    NewDb = init_db(Name, CompactFile, Fd, Header),
-    NewDb2 = copy_compact_docs(Db, NewDb),
-    NewDb3 =
-    case CopyLocal of
-    true ->
-        % suck up all the local docs into memory and write them to the new db
-        {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
-                fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
-        {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs),
-        commit_data(NewDb2#db{local_docs_btree=NewLocalBtree});
-    _ ->
-        NewDb2
-    end,
-    close_db(NewDb3),
-    Db#db.update_pid ! {compact_done, CompactFile}.
     
     
     
\ No newline at end of file

Modified: incubator/couchdb/trunk/src/couchdb/couch_db.hrl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_db.hrl?rev=682560&r1=682559&r2=682560&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_db.hrl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_db.hrl Mon Aug  4 18:43:40 2008
@@ -69,3 +69,40 @@
     % couch_db:open_doc(Db, Id, Options).
     meta = []
     }).
+    
+    
+
+
+
+-record(db_header,
+    {write_version = 0,
+     update_seq = 0,
+     summary_stream_state = nil,
+     fulldocinfo_by_id_btree_state = nil,
+     docinfo_by_seq_btree_state = nil,
+     local_docs_btree_state = nil,
+     doc_count=0,
+     doc_del_count=0
+    }).
+
+-record(db,
+    {main_pid=nil,
+    update_pid=nil,
+    compactor_pid=nil,
+    fd,
+    header = #db_header{},
+    summary_stream,
+    fulldocinfo_by_id_btree,
+    docinfo_by_seq_btree,
+    local_docs_btree,
+    update_seq,
+    doc_count,
+    doc_del_count,
+    name,
+    filepath
+    }).
+    
+    
+
+% small value used in revision trees to indicate the revision isn't stored
+-define(REV_MISSING, []).

Added: incubator/couchdb/trunk/src/couchdb/couch_db_updater.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_db_updater.erl?rev=682560&view=auto
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_db_updater.erl (added)
+++ incubator/couchdb/trunk/src/couchdb/couch_db_updater.erl Mon Aug  4 18:43:40 2008
@@ -0,0 +1,499 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License.  You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_db_updater).
+-behaviour(gen_server).
+
+-export([btree_by_id_reduce/2,btree_by_seq_reduce/2]).
+-export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
+
+-include("couch_db.hrl").
+
+-define(HEADER_SIG, <<$g, $m, $k, 0>>).
+
+init({MainPid, DbName, Filepath, Fd, Options}) ->
+    link(Fd),
+    case lists:member(create, Options) of
+    true ->
+        % create a new header and writes it to the file
+        Header =  #db_header{},
+        ok = couch_file:write_header(Fd, ?HEADER_SIG, Header),
+        % delete any old compaction files that might be hanging around
+        file:delete(Filepath ++ ".compact"),
+        file:delete(Filepath ++ ".old");
+    false ->
+        {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG)
+    end,
+    
+    Db = init_db(DbName, Filepath, Fd, Header),
+    {ok, Db#db{main_pid=MainPid}}.
+
+terminate(_Reason, Db) ->
+    close_db(Db).
+
+handle_call(get_db, _From, Db) ->
+    {reply, {ok, Db}, Db};
+handle_call({update_docs, DocActions, Options}, _From, Db) ->
+    try update_docs_int(Db, DocActions, Options) of
+    {ok, Db2} ->
+        ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+        couch_db_update_notifier:notify({updated, Db2#db.name}),
+        {reply, ok, Db2}
+    catch
+        throw: retry ->
+            {reply, retry, Db};
+        throw: conflict ->
+            {reply, conflict, Db}
+    end;
+handle_call(increment_update_seq, _From, Db) ->
+    Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
+    ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
+    couch_db_update_notifier:notify({updated, Db#db.name}),
+    {reply, {ok, Db2#db.update_seq}, Db2}.
+
+    
+handle_cast(start_compact, Db) ->
+    case Db#db.compactor_pid of
+    nil ->
+        ?LOG_INFO("Starting compaction for db \"~s\"", [Db#db.name]),
+        Pid = spawn_link(fun() -> start_copy_compact_int(Db, true) end),
+        Db2 = Db#db{compactor_pid=Pid},
+        ok = gen_server:call(Db#db.main_pid, {db_updated, Db2}),
+        {noreply, Db2};
+    _ ->
+        % compact currently running, this is a no-op
+        {noreply, Db}
+    end;
+handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
+    {ok, NewFd} = couch_file:open(CompactFilepath),
+    {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG),
+    #db{update_seq=NewSeq} = NewDb =
+            init_db(Db#db.name, CompactFilepath, NewFd, NewHeader),
+    case Db#db.update_seq == NewSeq of
+    true ->
+        NewDb2 = commit_data(
+            NewDb#db{
+                main_pid = Db#db.main_pid,
+                doc_count = Db#db.doc_count,
+                doc_del_count = Db#db.doc_del_count,
+                filepath = Filepath}),
+            
+        ?LOG_DEBUG("CouchDB swapping files ~s and ~s.", [Filepath, CompactFilepath]),
+        file:delete(Filepath),
+        ok = file:rename(CompactFilepath, Filepath),
+        
+        couch_stream:close(Db#db.summary_stream),
+        couch_file:close_maybe(Db#db.fd),
+        file:delete(Filepath ++ ".old"),
+            
+        ok = gen_server:call(Db#db.main_pid, {db_updated, NewDb2}),
+        ?LOG_INFO("Compaction for db ~p completed.", [Db#db.name]),
+        {noreply, NewDb2#db{compactor_pid=nil}};
+    false ->
+        ?LOG_INFO("Compaction file still behind main file "
+            "(update seq=~p. compact update seq=~p). Retrying.",
+            [Db#db.update_seq, NewSeq]),
+        Pid = spawn_link(fun() -> start_copy_compact_int(Db, false) end),
+        Db2 = Db#db{compactor_pid=Pid},
+        couch_file:close(NewFd),
+        {noreply, Db2}
+    end.
+
+handle_info(Msg, Db) ->
+    ?LOG_ERROR("Bad message received for db ~s: ~p", [Db#db.name, Msg]),
+    exit({error, Msg}).
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+btree_by_seq_split(DocInfo) ->
+    #doc_info{
+        id = Id,
+        rev = Rev,
+        update_seq = Seq,
+        summary_pointer = Sp,
+        conflict_revs = Conflicts,
+        deleted_conflict_revs = DelConflicts,
+        deleted = Deleted} = DocInfo,
+    {Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}}.
+    
+btree_by_seq_join(Seq,{Id, Rev, Sp, Conflicts, DelConflicts, Deleted}) ->
+    #doc_info{
+        id = Id,
+        rev = Rev,
+        update_seq = Seq,
+        summary_pointer = Sp,
+        conflict_revs = Conflicts,
+        deleted_conflict_revs = DelConflicts,
+        deleted = Deleted}.
+
+btree_by_id_split(#full_doc_info{id=Id, update_seq=Seq,
+        deleted=Deleted, rev_tree=Tree}) ->
+    {Id, {Seq, case Deleted of true -> 1; false-> 0 end, Tree}}.
+
+btree_by_id_join(Id, {Seq, Deleted, Tree}) ->
+    #full_doc_info{id=Id, update_seq=Seq, deleted=Deleted==1, rev_tree=Tree}.
+    
+
+
+btree_by_id_reduce(reduce, FullDocInfos) ->
+    % count the number of deleted documents
+    length([1 || #full_doc_info{deleted=false} <- FullDocInfos]);
+btree_by_id_reduce(rereduce, Reds) ->
+    lists:sum(Reds).
+            
+btree_by_seq_reduce(reduce, DocInfos) ->
+    % count the number of deleted documents
+    length(DocInfos);
+btree_by_seq_reduce(rereduce, Reds) ->
+    lists:sum(Reds).
+
+init_db(DbName, Filepath, Fd, Header) ->
+    {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
+    ok = couch_stream:set_min_buffer(SummaryStream, 10000),
+    {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
+        [{split, fun(X) -> btree_by_id_split(X) end},
+        {join, fun(X,Y) -> btree_by_id_join(X,Y) end},
+        {reduce, fun(X,Y) -> btree_by_id_reduce(X,Y) end}]),
+    {ok, SeqBtree} = couch_btree:open(Header#db_header.docinfo_by_seq_btree_state, Fd,
+            [{split, fun(X) -> btree_by_seq_split(X) end},
+            {join, fun(X,Y) -> btree_by_seq_join(X,Y) end},
+            {reduce, fun(X,Y) -> btree_by_seq_reduce(X,Y) end}]),
+    {ok, LocalDocsBtree} = couch_btree:open(Header#db_header.local_docs_btree_state, Fd),
+
+    #db{
+        update_pid=self(),
+        fd=Fd,
+        header=Header,
+        summary_stream = SummaryStream,
+        fulldocinfo_by_id_btree = IdBtree,
+        docinfo_by_seq_btree = SeqBtree,
+        local_docs_btree = LocalDocsBtree,
+        update_seq = Header#db_header.update_seq,
+        doc_count = Header#db_header.doc_count,
+        doc_del_count = Header#db_header.doc_del_count,
+        name = DbName,
+        filepath=Filepath }.
+
+close_db(#db{fd=Fd,summary_stream=Ss}) ->
+    couch_file:close(Fd),
+    couch_stream:close(Ss).
+
+% rev tree functions
+
+doc_to_tree(Doc) ->
+    doc_to_tree(Doc, lists:reverse(Doc#doc.revs)).
+
+doc_to_tree(Doc, [RevId]) ->
+    [{RevId, Doc, []}];
+doc_to_tree(Doc, [RevId | Rest]) ->
+    [{RevId, ?REV_MISSING, doc_to_tree(Doc, Rest)}].
+
+flush_trees(_Db, [], AccFlushedTrees) ->
+    {ok, lists:reverse(AccFlushedTrees)};
+flush_trees(#db{fd=Fd}=Db, [InfoUnflushed | RestUnflushed], AccFlushed) ->
+        #full_doc_info{rev_tree=Unflushed} = InfoUnflushed,
+        Flushed = couch_key_tree:map(
+        fun(_Rev, Value) ->
+            case Value of
+            #doc{attachments=Atts,deleted=IsDeleted}=Doc ->
+                % this node value is actually an unwritten document summary,
+                % write to disk.
+                % make sure the Fd in the written bins is the same Fd we are.
+                Bins =
+                case Atts of
+                [] -> [];
+                [{_BName, {_Type, {BinFd, _Sp, _Len}}} | _ ] when BinFd == Fd ->
+                    % convert bins, removing the FD.
+                    % All bins should have been flushed to disk already.
+                    [{BinName, {BinType, BinSp, BinLen}}
+                        || {BinName, {BinType, {_Fd, BinSp, BinLen}}}
+                        <- Atts];
+                _ ->
+                    % BinFd must not equal our Fd. This can happen when a database
+                    % is being switched out during a compaction
+                    ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []),
+                    throw(retry)
+                end,
+                {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
+                {IsDeleted, NewSummaryPointer};
+            _ ->
+                Value
+            end
+        end, Unflushed),
+    flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
+
+merge_rev_trees(_NoConflicts, [], [], AccNewInfos, AccSeq) ->
+    {ok, lists:reverse(AccNewInfos), AccSeq};
+merge_rev_trees(NoConflicts, [NewDocs|RestDocsList],
+        [OldDocInfo|RestOldInfo], AccNewInfos, AccSeq) ->
+    #full_doc_info{id=Id,rev_tree=OldTree}=OldDocInfo,
+    UpdatesRevTree = lists:foldl(
+        fun(NewDoc, AccTree) ->
+            couch_key_tree:merge(AccTree, doc_to_tree(NewDoc))
+        end,
+        [], NewDocs),
+    NewRevTree = couch_key_tree:merge(OldTree, UpdatesRevTree),
+    if NewRevTree == OldTree ->
+        % nothing changed
+        merge_rev_trees(NoConflicts, RestDocsList, RestOldInfo, AccNewInfos, AccSeq);
+    true ->
+        if NoConflicts andalso OldTree /= [] ->
+            OldConflicts = couch_key_tree:count_leafs(OldTree),
+            NewConflicts = couch_key_tree:count_leafs(NewRevTree),
+            if NewConflicts > OldConflicts ->
+                throw(conflict);
+            true -> ok
+            end;
+        true -> ok
+        end,
+        NewInfo = #full_doc_info{id=Id,update_seq=AccSeq+1,rev_tree=NewRevTree},
+        merge_rev_trees(NoConflicts, RestDocsList,RestOldInfo, 
+                [NewInfo|AccNewInfos],AccSeq+1)
+    end.
+
+new_index_entries([], DocCount, DelCount, AccById, AccBySeq) ->
+    {ok, DocCount, DelCount, AccById, AccBySeq};
+new_index_entries([FullDocInfo|RestInfos], DocCount, DelCount, AccById, AccBySeq) ->
+    #doc_info{deleted=Deleted} = DocInfo = couch_doc:to_doc_info(FullDocInfo),
+    {DocCount2, DelCount2} =
+    if Deleted -> {DocCount, DelCount + 1};
+    true -> {DocCount + 1, DelCount} 
+    end,
+    new_index_entries(RestInfos, DocCount2, DelCount2, 
+        [FullDocInfo#full_doc_info{deleted=Deleted}|AccById],
+        [DocInfo|AccBySeq]).
+
+update_docs_int(Db, DocsList, Options) ->
+    #db{
+        fulldocinfo_by_id_btree = DocInfoByIdBTree,
+        docinfo_by_seq_btree = DocInfoBySeqBTree,
+        update_seq = LastSeq,
+        doc_count = FullDocCount,
+        doc_del_count = FullDelCount
+        } = Db,
+
+    % separate out the NonRep documents from the rest of the documents
+    {DocsList2, NonRepDocs} = lists:foldl(
+        fun([#doc{id=Id}=Doc | Rest]=Docs, {DocsListAcc, NonRepDocsAcc}) ->
+            case Id of
+            ?LOCAL_DOC_PREFIX ++ _ when Rest==[] ->
+                % when saving NR (non rep) documents, you can only save a single rev
+                {DocsListAcc, [Doc | NonRepDocsAcc]};
+            Id->
+                {[Docs | DocsListAcc], NonRepDocsAcc}
+            end
+        end, {[], []}, DocsList),
+    
+    Ids = [Id || [#doc{id=Id}|_] <- DocsList2], 
+    
+    % lookup up the existing documents, if they exist.
+    OldDocLookups = couch_btree:lookup(DocInfoByIdBTree, Ids),
+    OldDocInfos = lists:zipwith(
+        fun(_Id, {ok, FullDocInfo}) ->
+            FullDocInfo;
+        (Id, not_found) ->
+            #full_doc_info{id=Id}
+        end,
+        Ids, OldDocLookups),
+    
+    {OldCount, OldDelCount} = lists:foldl(
+        fun({ok, FullDocInfo}, {OldCountAcc, OldDelCountAcc}) ->
+            case couch_doc:to_doc_info(FullDocInfo) of
+            #doc_info{deleted=false} ->
+                {OldCountAcc + 1, OldDelCountAcc};
+            _ ->
+                {OldCountAcc, OldDelCountAcc + 1}
+            end;
+        (not_found, Acc) ->
+            Acc
+        end, {0, 0}, OldDocLookups),
+    
+    % Merge the new docs into the revision trees.
+    NoConflicts = lists:member(new_edits, Options),
+    {ok, NewDocInfos, NewSeq} = merge_rev_trees(NoConflicts, DocsList2, OldDocInfos, [], LastSeq),
+    
+    RemoveSeqs =
+        [ OldSeq || {ok, #full_doc_info{update_seq=OldSeq}} <- OldDocLookups],
+    
+    % All regular documents are now ready to write.
+    
+    % Try to write the local documents first, a conflict might be generated
+    {ok, Db2}  = update_local_docs(Db, NonRepDocs),
+    
+    % Write out the documents summaries (they are stored in the nodes of the rev trees)
+    {ok, FlushedDocInfos} = flush_trees(Db2, NewDocInfos, []),
+    
+    {ok, NewDocsCount, NewDelCount, InfoById, InfoBySeq} =
+        new_index_entries(FlushedDocInfos, 0, 0, [], []),
+
+    % and the indexes to the documents
+    {ok, DocInfoBySeqBTree2} = couch_btree:add_remove(DocInfoBySeqBTree, InfoBySeq, RemoveSeqs),
+    {ok, DocInfoByIdBTree2} = couch_btree:add_remove(DocInfoByIdBTree, InfoById, []),
+
+    Db3 = Db2#db{
+        fulldocinfo_by_id_btree = DocInfoByIdBTree2,
+        docinfo_by_seq_btree = DocInfoBySeqBTree2,
+        update_seq = NewSeq,
+        doc_count = FullDocCount + NewDocsCount - OldCount,
+        doc_del_count = FullDelCount + NewDelCount - OldDelCount},
+
+    case lists:member(delay_commit, Options) of
+    true ->
+        {ok, Db3};
+    false ->
+        {ok, commit_data(Db3)}
+    end.
+
+update_local_docs(#db{local_docs_btree=Btree}=Db, Docs) ->
+    Ids = [Id || #doc{id=Id} <- Docs],
+    OldDocLookups = couch_btree:lookup(Btree, Ids),
+    BtreeEntries = lists:zipwith(
+        fun(#doc{id=Id,deleted=Delete,revs=Revs,body=Body}, OldDocLookup) ->
+            NewRev =
+            case Revs of
+                [] -> 0;
+                [RevStr|_] -> list_to_integer(RevStr)
+            end,
+            OldRev =
+            case OldDocLookup of
+                {ok, {_, {OldRev0, _}}} -> OldRev0;
+                not_found -> 0
+            end,
+            case OldRev + 1 == NewRev of
+            true ->
+                case Delete of
+                    false -> {update, {Id, {NewRev, Body}}};
+                    true  -> {remove, Id}
+                end;
+            false ->
+                throw(conflict)
+            end
+            
+        end, Docs, OldDocLookups),
+
+    BtreeIdsRemove = [Id || {remove, Id} <- BtreeEntries],
+    BtreeIdsUpdate = [ByIdDocInfo || {update, ByIdDocInfo} <- BtreeEntries],
+
+    {ok, Btree2} =
+        couch_btree:add_remove(Btree, BtreeIdsUpdate, BtreeIdsRemove),
+
+    {ok, Db#db{local_docs_btree = Btree2}}.
+
+
+
+commit_data(#db{fd=Fd, header=Header} = Db) ->
+    Header2 = Header#db_header{
+        update_seq = Db#db.update_seq,
+        summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
+        docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
+        fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
+        local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
+        doc_count = Db#db.doc_count,
+        doc_del_count = Db#db.doc_del_count
+        },
+    if Header == Header2 ->
+        Db; % unchanged. nothing to do
+    true ->
+        ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
+        Db#db{header = Header2}
+    end.
+
+copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
+    {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
+    % copy the bin values
+    NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
+        {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
+        {Name, {Type, NewBinSp, Len}}
+        end, BinInfos),
+    % now write the document summary
+    {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}),
+    Sp.
+
+copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
+    [];
+copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp}, []} | RestTree]) ->
+    % This is a leaf node, copy it over
+    NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
+    [{RevId, {IsDel, NewSp}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
+copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
+    % inner node, only copy info/data from leaf nodes
+    [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)].
+    
+copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq) ->
+    Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
+    LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
+    NewFullDocInfos = lists:map(
+        fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
+            Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
+        end, LookupResults),
+    NewDocInfos = [couch_doc:to_doc_info(FullDocInfo) || FullDocInfo <- NewFullDocInfos],
+    {ok, DocInfoBTree} =
+        couch_btree:add_remove(NewDb#db.docinfo_by_seq_btree, NewDocInfos, []),
+    {ok, FullDocInfoBTree} =
+        couch_btree:add_remove(NewDb#db.fulldocinfo_by_id_btree, NewFullDocInfos, []),
+    NewDb#db{fulldocinfo_by_id_btree=FullDocInfoBTree, docinfo_by_seq_btree=DocInfoBTree}.
+
+
+          
+copy_compact_docs(Db, NewDb) ->
+    EnumBySeqFun =
+    fun(#doc_info{update_seq=Seq}=DocInfo, _Offset, {AccNewDb, AccUncopied}) ->
+        case couch_util:should_flush() of
+        true ->
+            NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied])),
+            {ok, {commit_data(NewDb2#db{update_seq=Seq}), []}};
+        false ->    
+            {ok, {AccNewDb, [DocInfo | AccUncopied]}}
+        end
+    end,
+    {ok, {NewDb2, Uncopied}} =
+        couch_btree:foldl(Db#db.docinfo_by_seq_btree, NewDb#db.update_seq + 1, EnumBySeqFun, {NewDb, []}),
+
+    case Uncopied of
+    [#doc_info{update_seq=LastSeq} | _] ->
+        commit_data( copy_docs(Db, NewDb2#db{update_seq=LastSeq},
+            lists:reverse(Uncopied)));
+    [] ->
+        NewDb2
+    end.
+
+start_copy_compact_int(#db{name=Name,filepath=Filepath}=Db, CopyLocal) ->
+    CompactFile = Filepath ++ ".compact",
+    ?LOG_DEBUG("Compaction process spawned for db \"~s\"", [Name]),
+    case couch_file:open(CompactFile) of
+    {ok, Fd} ->
+        ?LOG_DEBUG("Found existing compaction file for db \"~s\"", [Name]),
+        {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
+    {error, enoent} ->
+        receive after 1000 -> ok end,
+        {ok, Fd} = couch_file:open(CompactFile, [create]),
+        Header =  #db_header{},
+        ok = couch_file:write_header(Fd, ?HEADER_SIG, Header)
+    end,
+    NewDb = init_db(Name, CompactFile, Fd, Header),
+    NewDb2 = copy_compact_docs(Db, NewDb),
+    NewDb3 =
+    case CopyLocal of
+    true ->
+        % suck up all the local docs into memory and write them to the new db
+        {ok, LocalDocs} = couch_btree:foldl(Db#db.local_docs_btree,
+                fun(Value, _Offset, Acc) -> {ok, [Value | Acc]} end, []),
+        {ok, NewLocalBtree} = couch_btree:add(NewDb2#db.local_docs_btree, LocalDocs),
+        commit_data(NewDb2#db{local_docs_btree=NewLocalBtree});
+    _ ->
+        NewDb2
+    end,
+    close_db(NewDb3),
+    
+    gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}).
\ No newline at end of file

Modified: incubator/couchdb/trunk/src/couchdb/couch_file.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_file.erl?rev=682560&r1=682559&r2=682560&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_file.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_file.erl Mon Aug  4 18:43:40 2008
@@ -20,6 +20,7 @@
 -export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]).
 -export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]).
 -export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
+-export([close_maybe/1,drop_ref/1,drop_ref/2,add_ref/1,add_ref/2,num_refs/1]).
 
 %%----------------------------------------------------------------------
 %% Args:   Valid Options are [create] and [create,overwrite].
@@ -164,7 +165,25 @@
 %%----------------------------------------------------------------------
 close(Fd) ->
     gen_server:cast(Fd, close).
+    
+close_maybe(Fd) ->
+    gen_server:cast(Fd, {close_maybe, self()}).
+
+drop_ref(Fd) ->
+    drop_ref(Fd, self()).
+    
+drop_ref(Fd, Pid) ->
+    gen_server:cast(Fd, {drop_ref, Pid}).
+
+
+add_ref(Fd) ->
+    add_ref(Fd, self()).
 
+add_ref(Fd, Pid) ->
+    gen_server:call(Fd, {add_ref, Pid}).
+
+num_refs(Fd) ->
+    gen_server:call(Fd, num_refs).
 
 write_header(Fd, Prefix, Data) ->
     TermBin = term_to_binary(Data),
@@ -267,7 +286,7 @@
 
 init_status_error(ReturnPid, Error) ->
     ReturnPid ! {self(), Error}, % signal back error status
-    self() ! self_close, % tell ourself to close async
+    gen_server:cast(self(), close), % tell ourself to close async
     {ok, nil}.
 
 % server functions
@@ -342,16 +361,57 @@
     {ok, <<TermLen:32>>}
         = file:pread(Fd, Pos, 4),
     {ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
-    {reply, {ok, Bin}, Fd}.
+    {reply, {ok, Bin}, Fd};
+handle_call({add_ref, Pid},_From, Fd) ->
+    undefined = put(Pid, erlang:monitor(process, Pid)),
+    {reply, ok, Fd};
+handle_call(num_refs, _From, Fd) ->
+    {monitors, Monitors} =  process_info(self(), monitors),
+    {reply, length(Monitors), Fd}.
+
 
 
 handle_cast(close, Fd) ->
-    {stop,normal,Fd}. % causes terminate to be called
+    {stop,normal,Fd};
+handle_cast({close_maybe, Pid}, Fd) ->
+    catch unlink(Pid),
+    maybe_close_async(Fd);
+handle_cast({drop_ref, Pid}, Fd) ->
+    % don't check return of demonitor. The process could haved crashed causing
+    % the {'DOWN', ...} message to be sent and the process unmonitored.
+    erlang:demonitor(erase(Pid), [flush]),
+    maybe_close_async(Fd).
+
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-handle_info(self_close, State) ->
-    {stop,normal,State};
-handle_info(_Info, State) ->
-    {noreply, State}.
+handle_info({'DOWN', MonitorRef, _Type, Pid, _Info}, Fd) ->
+    MonitorRef = erase(Pid),
+    maybe_close_async(Fd);
+handle_info(Info, Fd) ->
+    exit({error, {Info, Fd}}).
+
+
+
+should_close(Fd) ->
+    case process_info(self(), links) of
+    {links, [Fd]} ->
+        % no linkers left (except our fd). What about monitors?
+        case process_info(self(), monitors) of
+        {monitors, []} ->
+            true;
+        _ ->
+            false
+        end;
+    {links,  Links} when length(Links) > 1 ->
+        false
+    end.
+
+maybe_close_async(Fd) ->
+    case should_close(Fd) of
+    true ->
+        {stop,normal,Fd};
+    false ->
+        {noreply,Fd}
+    end.

Modified: incubator/couchdb/trunk/src/couchdb/couch_httpd.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_httpd.erl?rev=682560&r1=682559&r2=682560&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_httpd.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_httpd.erl Mon Aug  4 18:43:40 2008
@@ -156,7 +156,8 @@
 
 handle_db_request(Req, 'PUT', {DbName, []}) ->
     case couch_server:create(DbName, []) of
-        {ok, _Db} ->
+        {ok, Db} ->
+            couch_db:close(Db),
             send_json(Req, 201, {obj, [{ok, true}]});
         {error, database_already_exists} ->
             Msg = io_lib:format("Database ~p already exists.", [DbName]),
@@ -167,9 +168,13 @@
     end;
 
 handle_db_request(Req, Method, {DbName, Rest}) ->
-    case couch_server:open(DbName) of
+    case couch_db:open(DbName, []) of
         {ok, Db} ->
-            handle_db_request(Req, Method, {DbName, Db, Rest});
+            try 
+                handle_db_request(Req, Method, {DbName, Db, Rest})
+            after
+                couch_db:close(Db)
+            end;
         Error ->
             throw(Error)
     end;

Modified: incubator/couchdb/trunk/src/couchdb/couch_rep.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_rep.erl?rev=682560&r1=682559&r2=682560&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_rep.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_rep.erl Mon Aug  4 18:43:40 2008
@@ -43,7 +43,18 @@
 
 replicate(Source, Target, Options) ->
     {ok, DbSrc} = open_db(Source),
-    {ok, DbTgt} = open_db(Target),
+    try
+        {ok, DbTgt} = open_db(Target),
+        try
+            replicate2(Source, DbSrc, Target, DbTgt, Options)
+        after
+            close_db(DbTgt)
+        end        
+    after
+        close_db(DbSrc)
+    end.
+    
+replicate2(Source, DbSrc, Target, DbTgt, Options) ->
     {ok, HostName} = inet:gethostname(),
 
     RepRecKey = ?LOCAL_DOC_PREFIX ++ HostName ++ ":" ++ Source ++ ":" ++ Target,
@@ -237,7 +248,12 @@
         {ok, "http" ++ DbName ++ "/"}
     end;
 open_db(DbName)->
-    couch_server:open(DbName).
+    couch_db:open(DbName, []).
+
+close_db("http" ++ _)->
+    ok;
+close_db(DbName)->
+    couch_db:close(DbName).
 
 
 enum_docs_since(DbUrl, StartSeq, InFun, InAcc) when is_list(DbUrl) ->

Modified: incubator/couchdb/trunk/src/couchdb/couch_server.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_server.erl?rev=682560&r1=682559&r2=682560&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_server.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_server.erl Mon Aug  4 18:43:40 2008
@@ -15,7 +15,7 @@
 -behaviour(application).
 
 -export([start/0,start/1,start/2,stop/0,stop/1]).
--export([open/1,create/2,delete/1,all_databases/0,get_version/0]).
+-export([open/2,create/2,delete/1,all_databases/0,get_version/0]).
 -export([init/1, handle_call/3,sup_start_link/2]).
 -export([handle_cast/2,code_change/3,handle_info/2,terminate/2]).
 -export([dev_start/0,remote_restart/0]).
@@ -25,7 +25,9 @@
 -record(server,{
     root_dir = [],
     dbname_regexp,
-    options=[]
+    remote_restart=[],
+    max_dbs_open=100,
+    current_dbs_open=0
     }).
 
 start() ->
@@ -64,33 +66,41 @@
 sup_start_link(RootDir, Options) ->
     gen_server:start_link({local, couch_server}, couch_server, {RootDir, Options}, []).
 
-open(Filename) ->
-    gen_server:call(couch_server, {open, Filename}).
+open(DbName, Options) ->
+    gen_server:call(couch_server, {open, DbName, Options}).
 
-create(Filename, Options) ->
-    gen_server:call(couch_server, {create, Filename, Options}).
+create(DbName, Options) ->
+    gen_server:call(couch_server, {create, DbName, Options}).
 
-delete(Filename) ->
-    gen_server:call(couch_server, {delete, Filename}).
+delete(DbName) ->
+    gen_server:call(couch_server, {delete, DbName}).
 
 remote_restart() ->
     gen_server:call(couch_server, remote_restart).
 
-init({RootDir, Options}) ->
-    {ok, RegExp} = regexp:parse("^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"),
-    {ok, #server{root_dir=RootDir, dbname_regexp=RegExp, options=Options}}.
-
-check_filename(#server{dbname_regexp=RegExp}, Filename) ->
-    case regexp:match(Filename, RegExp) of
+check_dbname(#server{dbname_regexp=RegExp}, DbName) ->
+    case regexp:match(DbName, RegExp) of
     nomatch ->
         {error, illegal_database_name};
     _Match ->
         ok
     end.
 
-get_full_filename(Server, Filename) ->
-    filename:join([Server#server.root_dir, "./" ++ Filename ++ ".couch"]).
+get_full_filename(Server, DbName) ->
+    filename:join([Server#server.root_dir, "./" ++ DbName ++ ".couch"]).
 
+init({RootDir, Options}) ->
+    {ok, RegExp} = regexp:parse("^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*$"),
+    ets:new(couch_dbs_by_name, [set, private, named_table]),
+    ets:new(couch_dbs_by_pid, [set, private, named_table]),
+    ets:new(couch_dbs_by_lru, [ordered_set, private, named_table]),
+    process_flag(trap_exit, true),
+    MaxDbsOpen = proplists:get_value(max_dbs_open, Options),
+    RemoteRestart = proplists:get_value(remote_restart, Options),
+    {ok, #server{root_dir=RootDir,
+                dbname_regexp=RegExp,
+                max_dbs_open=MaxDbsOpen,
+                remote_restart=RemoteRestart}}.
 
 terminate(_Reason, _Server) ->
     ok.
@@ -109,107 +119,141 @@
     {ok, Filenames}.
 
 
+maybe_close_lru_db(#server{current_dbs_open=NumOpen, max_dbs_open=MaxOpen}=Server)
+        when NumOpen < MaxOpen ->
+    {ok, Server};
+maybe_close_lru_db(#server{current_dbs_open=NumOpen}=Server) ->
+    % must free up the lru db.
+    case try_close_lru(now()) of
+    ok -> {ok, Server#server{current_dbs_open=NumOpen-1}};
+    Error -> Error
+    end.
+
+try_close_lru(StartTime) ->
+    LruTime = ets:first(couch_dbs_by_lru),
+    if LruTime > StartTime ->
+        % this means we've looped through all our opened dbs and found them
+        % all in use.
+        {error, all_dbs_active};
+    true ->
+        [{_, DbName}] = ets:lookup(couch_dbs_by_lru, LruTime),
+        [{_, {MainPid, LruTime}}] = ets:lookup(couch_dbs_by_name, DbName),
+        case couch_db:num_refs(MainPid) of
+        0 ->
+            exit(MainPid, kill),
+            receive {'EXIT', MainPid, _Reason} -> ok end,
+            true = ets:delete(couch_dbs_by_lru, LruTime),
+            true = ets:delete(couch_dbs_by_name, DbName),
+            true = ets:delete(couch_dbs_by_pid, MainPid),
+            ok;
+        _NumRefs ->
+            % this still has referrers. Go ahead and give it a current lru time
+            % and try the next one in the table.
+            NewLruTime = now(),
+            true = ets:insert(couch_dbs_by_name, {DbName, {MainPid, NewLruTime}}),
+            true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}),
+            true = ets:delete(couch_dbs_by_lru, LruTime),
+            true = ets:insert(couch_dbs_by_lru, {NewLruTime, DbName}),
+            try_close_lru(StartTime)
+        end
+    end.
+
+
 handle_call(get_root, _From, #server{root_dir=Root}=Server) ->
     {reply, {ok, Root}, Server};
-handle_call({open, Filename}, From, Server) ->
-    case check_filename(Server, Filename) of
-    {error, Error} ->
-        {reply, {error, Error}, Server};
-    ok ->
-        Filepath = get_full_filename(Server, Filename),
-        Result = supervisor:start_child(couch_server_sup,
-            {Filename,
-                {couch_db, open, [Filename, Filepath]},
-                transient ,
-                infinity,
-                supervisor,
-                [couch_db]}),
-        case Result of
-        {ok, Db} ->
-            {reply, {ok, Db}, Server};
-        {error, already_present} ->
-            ok = supervisor:delete_child(couch_server_sup, Filename),
-            % call self recursively
-            handle_call({open, Filename}, From, Server);
-        {error, {already_started, Db}} ->
-            {reply, {ok, Db}, Server};
-        {error, {not_found, _}} ->
-            {reply, not_found, Server};
-        {error, {Error, _}} ->
-            {reply, {error, Error}, Server}
-        end
+handle_call({open, DbName, Options}, {FromPid,_}, Server) ->
+    Filepath = get_full_filename(Server, DbName),
+    LruTime = now(),
+    case ets:lookup(couch_dbs_by_name, DbName) of
+    [] ->    
+        case maybe_close_lru_db(Server) of
+        {ok, Server2} ->
+            case couch_db:start_link(DbName, Filepath, Options) of
+            {ok, MainPid} ->
+                true = ets:insert(couch_dbs_by_name, {DbName, {MainPid, LruTime}}),
+                true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}),
+                true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}),
+                DbsOpen = Server2#server.current_dbs_open + 1,
+                {reply,
+                    couch_db:open_ref_counted(MainPid, FromPid),
+                    Server2#server{current_dbs_open=DbsOpen}};
+            CloseError ->
+                {reply, CloseError, Server2}
+            end;
+        Error ->
+            {reply, Error, Server}
+        end;
+    [{_, {MainPid, PrevLruTime}}] ->
+        true = ets:insert(couch_dbs_by_name, {DbName, {MainPid, LruTime}}),
+        true = ets:delete(couch_dbs_by_lru, PrevLruTime),
+        true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}),
+        {reply, couch_db:open_ref_counted(MainPid, FromPid), Server}
     end;
-handle_call({create, Filename, Options}, _From, Server) ->
-    case check_filename(Server, Filename) of
-    {error, Error} ->
-        {reply, {error, Error}, Server};
+handle_call({create, DbName, Options}, {FromPid,_}, Server) ->
+    case check_dbname(Server, DbName) of
     ok ->
-        Filepath = get_full_filename(Server, Filename),
-        ChildSpec = {Filename,
-                {couch_db, create, [Filename, Filepath, Options]},
-                transient,
-                infinity,
-                supervisor,
-                [couch_db]},
-        Result =
-        case supervisor:delete_child(couch_server_sup, Filename) of
-        ok ->
-            sup_start_child(couch_server_sup, ChildSpec);
-        {error, not_found} ->
-            sup_start_child(couch_server_sup, ChildSpec);
-        {error, running} ->
-            % a server process for this database already started. Maybe kill it
-            case lists:member(overwrite, Options) of
-            true ->
-                supervisor:terminate_child(couch_server_sup, Filename),
-                ok = supervisor:delete_child(couch_server_sup, Filename),
-                sup_start_child(couch_server_sup, ChildSpec);
-            false ->
-                {error, database_already_exists}
-            end
-        end,
-        case Result of
-        {ok, _Db} -> couch_db_update_notifier:notify({created, Filename});
-        _ -> ok
-        end,
-        {reply, Result, Server}
+        Filepath = get_full_filename(Server, DbName),
+
+        case ets:lookup(couch_dbs_by_name, DbName) of
+        [] ->
+            case couch_db:start_link(DbName, Filepath, [create|Options]) of
+            {ok, MainPid} ->
+                LruTime = now(),
+                true = ets:insert(couch_dbs_by_name, {DbName, {MainPid, LruTime}}),
+                true = ets:insert(couch_dbs_by_pid, {MainPid, DbName}),
+                true = ets:insert(couch_dbs_by_lru, {LruTime, DbName}),
+                DbsOpen = Server#server.current_dbs_open + 1,
+                {reply,
+                    couch_db:open_ref_counted(MainPid, FromPid), 
+                    Server#server{current_dbs_open=DbsOpen}};
+            Error ->
+                {reply, Error, Server}
+            end;
+        [_AlreadyRunningDb] ->
+            {reply, {error, file_exists}, Server}
+        end;
+    Error ->
+        {reply, Error, Server}
     end;
-handle_call({delete, Filename}, _From, Server) ->
-    FullFilepath = get_full_filename(Server, Filename),
-    supervisor:terminate_child(couch_server_sup, Filename),
-    supervisor:delete_child(couch_server_sup, Filename),
+handle_call({delete, DbName}, _From, Server) ->
+    FullFilepath = get_full_filename(Server, DbName),
+    Server2 = 
+    case ets:lookup(couch_dbs_by_name, DbName) of
+    [] -> Server;
+    [{_, {Pid, LruTime}}] ->
+        exit(Pid, kill),
+        receive {'EXIT', Pid, _Reason} -> ok end,
+        true = ets:delete(couch_dbs_by_name, DbName),
+        true = ets:delete(couch_dbs_by_pid, Pid),
+        true = ets:delete(couch_dbs_by_lru, LruTime),
+        DbsOpen = Server#server.current_dbs_open - 1,
+        Server#server{current_dbs_open=DbsOpen}
+    end,
     case file:delete(FullFilepath) of
     ok ->
-        couch_db_update_notifier:notify({deleted, Filename}),
-        {reply, ok, Server};
+        couch_db_update_notifier:notify({deleted, DbName}),
+        {reply, ok, Server2};
     {error, enoent} ->
-        {reply, not_found, Server};
+        {reply, not_found, Server2};
     Else ->
-        {reply, Else, Server}
+        {reply, Else, Server2}
     end;
-handle_call(remote_restart, _From, #server{options=Options}=Server) ->
-    case proplists:get_value(remote_restart, Options) of
-    true ->
-        exit(self(), restart);
-    _ ->
-        ok
-    end,
+handle_call(remote_restart, _From, #server{remote_restart=false}=Server) ->
+    {reply, ok, Server};
+handle_call(remote_restart, _From, #server{remote_restart=true}=Server) ->
+    exit(couch_server_sup, restart),
     {reply, ok, Server}.
 
-% this function is just to strip out the child spec error stuff if hit
-sup_start_child(couch_server_sup, ChildSpec) ->
-    case supervisor:start_child(couch_server_sup, ChildSpec) of
-    {error, {Error, _ChildInfo}} ->
-        {error, Error};
-    Else ->
-        Else
-    end.
-
-handle_cast(_Msg, State) ->
-    {noreply,State}.
+handle_cast(Msg, _Server) ->
+    exit({unknown_cast_message, Msg}).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-handle_info(_Info, State) ->
-    {noreply, State}.
+handle_info({'EXIT', Pid, _Reason}, Server) ->
+    [{Pid, DbName}] = ets:lookup(couch_dbs_by_pid, Pid),
+    true = ets:delete(couch_dbs_by_pid, Pid),
+    true = ets:delete(couch_dbs_by_name, DbName),
+    {noreply, Server};
+handle_info(Info, _Server) ->
+    exit({unknown_message, Info}).

Modified: incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl?rev=682560&r1=682559&r2=682560&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_server_sup.erl Mon Aug  4 18:43:40 2008
@@ -74,8 +74,9 @@
     UtilDriverDir = proplists:get_value({"Couch", "UtilDriverDir"}, Ini, ""),
     UpdateNotifierExes = proplists:get_all_values({"Couch", "DbUpdateNotificationProcess"}, Ini),
     FtSearchQueryServer = proplists:get_value({"Couch", "FullTextSearchQueryServer"}, Ini, ""),
-    RemoteRestart = list_to_atom(proplists:get_value({"Couch", "AllowRemoteRestart"}, Ini, "undefined")),
-    ServerOptions = [{remote_restart, RemoteRestart}],
+    RemoteRestart = list_to_atom(proplists:get_value({"Couch", "AllowRemoteRestart"}, Ini, "false")),
+    MaxDbsOpen = proplists:get_value({"Couch", "MaxDbsOpen"}, Ini, 100),
+    ServerOptions = [{remote_restart, RemoteRestart}, {max_dbs_open, MaxDbsOpen}],
     QueryServers = [{Lang, QueryExe} || {{"Couch Query Servers", Lang}, QueryExe} <- Ini],
 
     ChildProcesses =

Modified: incubator/couchdb/trunk/src/couchdb/couch_view.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_view.erl?rev=682560&r1=682559&r2=682560&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_view.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_view.erl Mon Aug  4 18:43:40 2008
@@ -57,22 +57,22 @@
     Pid.
     
 get_updated_group(Pid) ->
-	Mref = erlang:monitor(process, Pid),
+    Mref = erlang:monitor(process, Pid),
     receive
-	{'DOWN', Mref, _, _, Reason} ->
-	    throw(Reason)
+    {'DOWN', Mref, _, _, Reason} ->
+        throw(Reason)
     after 0 ->
-	    Pid ! {self(), get_updated},
-	    receive
-    	{Pid, Response} ->
-    	    erlang:demonitor(Mref),
-    	    receive
-        		{'DOWN', Mref, _, _, _} -> ok
-        	    after 0 -> ok
-    	    end,
-    	    Response;
-    	{'DOWN', Mref, _, _, Reason} ->
-    	    throw(Reason)
+        Pid ! {self(), get_updated},
+        receive
+        {Pid, Response} ->
+            erlang:demonitor(Mref),
+            receive
+                {'DOWN', Mref, _, _, _} -> ok
+                after 0 -> ok
+            end,
+            Response;
+        {'DOWN', Mref, _, _, Reason} ->
+            throw(Reason)
         end
     end.
 
@@ -216,10 +216,7 @@
     {ok, #server{root_dir=RootDir}}.
 
 terminate(_Reason, _) ->
-    catch ets:delete(couch_views_by_name),
-    catch ets:delete(couch_views_by_updater),
-    catch ets:delete(couch_views_by_db),
-    catch ets:delete(couch_views_temp_fd_by_db).
+    ok.
 
 
 handle_call({start_temp_updater, DbName, Lang, MapSrc, RedSrc}, _From, #server{root_dir=Root}=Server) ->
@@ -317,7 +314,7 @@
 
 start_temp_update_loop(DbName, Fd, Lang, MapSrc, RedSrc) ->
     NotifyPids = get_notify_pids(1000),
-    case couch_server:open(DbName) of
+    case couch_db:open(DbName, []) of
     {ok, Db} ->
         View = #view{map_names=["_temp"],
             id_num=0,
@@ -331,16 +328,20 @@
             def_lang=Lang,
             id_btree=nil},
         Group2 = init_group(Db, Fd, Group,nil),
-        temp_update_loop(Group2, NotifyPids);
+        couch_db:monitor(Db),
+        couch_db:close(Db),
+        temp_update_loop(DbName, Group2, NotifyPids);
     Else ->
         exit(Else)
     end.
 
-temp_update_loop(Group, NotifyPids) ->
-    {ok, Group2} = update_group(Group),
+temp_update_loop(DbName, Group, NotifyPids) ->
+    {ok, Db} = couch_db:open(DbName, []),
+    {ok, Group2} = update_group(Group#group{db=Db}),
+    couch_db:close(Db),
     [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
     garbage_collect(),
-    temp_update_loop(Group2, get_notify_pids(10000)).
+    temp_update_loop(DbName, Group2, get_notify_pids(10000)).
 
 
 reset_group(#group{views=Views}=Group) ->
@@ -355,21 +356,21 @@
     
 start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
     {Db, Group} =
-    case (catch couch_server:open(DbName)) of
+    case (catch couch_db:open(DbName, [])) of
     {ok, Db0} ->
         case (catch couch_db:open_doc(Db0, GroupId)) of
         {ok, Doc} ->
             {Db0, design_doc_to_view_group(Doc)};
- 		Else ->
- 		    delete_index_file(RootDir, DbName, GroupId),
- 		    exit(Else)
- 		end;
- 	Else ->
- 	    delete_index_file(RootDir, DbName, GroupId),
- 	    exit(Else)
- 	end,
- 	FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view",
- 	Group2 =
+        Else ->
+            delete_index_file(RootDir, DbName, GroupId),
+            exit(Else)
+        end;
+    Else ->
+        delete_index_file(RootDir, DbName, GroupId),
+        exit(Else)
+    end,
+    FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view",
+    Group2 =
     case couch_file:open(FileName) of
     {ok, Fd} ->
         Sig = Group#group.sig,
@@ -386,7 +387,8 @@
         Error    -> throw(Error)
         end
     end,
-    
+    couch_db:monitor(Db),
+    couch_db:close(Db),
     update_loop(RootDir, DbName, GroupId, Group2, NotifyPids).
 
 reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
@@ -396,14 +398,22 @@
     init_group(Db, Fd, reset_group(Group), nil).
 
 update_loop(RootDir, DbName, GroupId, #group{sig=Sig,fd=Fd}=Group, NotifyPids) ->
-    try update_group(Group) of
-    {ok, Group2} ->    
+    {ok, Db}= couch_db:open(DbName, []),
+    Result =
+    try
+        update_group(Group#group{db=Db})
+    catch
+        throw: restart -> restart
+    after
+        couch_db:close(Db)
+    end,
+    case Result of
+    {ok, Group2} ->
         HeaderData = {Sig, get_index_header_data(Group2)},
         ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData),
         [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
         garbage_collect(),
-        update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000))
-    catch
+        update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000));
     restart ->
         couch_file:close(Group#group.fd),
         start_update_loop(RootDir, DbName, GroupId, NotifyPids ++ get_notify_pids())
@@ -414,20 +424,23 @@
     receive
     {Pid, get_updated} ->
         [Pid | get_notify_pids()];
+    {'DOWN', _MonitorRef, _Type, _Pid, _Info} ->
+        ?LOG_DEBUG("View monitor received parent db shutdown notification. Shutting down instance.", []),
+        exit(normal);
     Else ->
         ?LOG_ERROR("Unexpected message in view updater: ~p", [Else]),
         exit({error, Else})
     after Wait ->
         exit(wait_timeout)
-	end.
+    end.
 % then keep getting all available and return.
 get_notify_pids() ->
     receive
     {Pid, get_updated} ->
         [Pid | get_notify_pids()]
-	after 0 ->
-	    []
-	end.
+    after 0 ->
+        []
+    end.
     
 update_group(#group{db=Db,current_seq=CurrentSeq, views=Views}=Group) ->
     ViewEmptyKVs = [{View, []} || View <- Views],