You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2009/05/17 23:31:18 UTC

svn commit: r775761 - in /couchdb/branches/tail_header: ./ share/www/script/test/ src/couchdb/

Author: damien
Date: Sun May 17 21:31:17 2009
New Revision: 775761

URL: http://svn.apache.org/viewvc?rev=775761&view=rev
Log:
Tail append headers. Replication doesn't work yet, maybe needs file format upgrade code.

Added:
    couchdb/branches/tail_header/
      - copied from r775759, couchdb/trunk/
Modified:
    couchdb/branches/tail_header/share/www/script/test/basics.js
    couchdb/branches/tail_header/src/couchdb/couch_db.erl
    couchdb/branches/tail_header/src/couchdb/couch_db.hrl
    couchdb/branches/tail_header/src/couchdb/couch_db_updater.erl
    couchdb/branches/tail_header/src/couchdb/couch_doc.erl
    couchdb/branches/tail_header/src/couchdb/couch_file.erl
    couchdb/branches/tail_header/src/couchdb/couch_httpd_db.erl
    couchdb/branches/tail_header/src/couchdb/couch_stream.erl
    couchdb/branches/tail_header/src/couchdb/couch_view_group.erl

Modified: couchdb/branches/tail_header/share/www/script/test/basics.js
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/share/www/script/test/basics.js?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/share/www/script/test/basics.js (original)
+++ couchdb/branches/tail_header/share/www/script/test/basics.js Sun May 17 21:31:17 2009
@@ -133,7 +133,7 @@
 
     // make sure we can still open the old rev of the deleted doc
     T(db.open(existingDoc._id, {rev: existingDoc._rev}) != null);
-
+    console.log("db.info: " + db.info.update_seq),
     // make sure restart works
     T(db.ensureFullCommit().ok);
     restartServer();

Modified: couchdb/branches/tail_header/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_db.erl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_db.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_db.erl Sun May 17 21:31:17 2009
@@ -154,7 +154,7 @@
 purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->
     gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
     
-get_committed_update_seq(#db{header=#db_header{update_seq=Seq}}) ->
+get_committed_update_seq(#db{committed_update_seq=Seq}) ->
     Seq.
 
 get_update_seq(#db{update_seq=Seq})->
@@ -566,92 +566,50 @@
     {Fd, StreamPointer, Len};
   
 flush_binary(Fd, {OtherFd, StreamPointer, Len}) ->
-    with_stream(Fd, fun(OutputStream) -> 
-        % written to a different file (or a closed file
-        % instance, which will cause an error)
-        ok = couch_stream:set_min_buffer(OutputStream, Len),
-        {ok, {NewStreamPointer, Len}, _EndSp} =
-        couch_stream:foldl(OtherFd, StreamPointer, Len,
-            fun(Bin, {BeginPointer, SizeAcc}) ->
-                {ok, Pointer} = couch_stream:write(OutputStream, Bin),
-                case SizeAcc of
-                0 -> % this was the first write, record the pointer
-                    {ok, {Pointer, size(Bin)}};
-                _ ->
-                    {ok, {BeginPointer, SizeAcc  + size(Bin)}}
-                end
-            end,
-            {{0,0}, 0}),
-        {Fd, NewStreamPointer, Len}
-    end);
+    {NewStreamData, Len} = 
+            couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
+    {OtherFd, NewStreamData, Len};
                          
 flush_binary(Fd, Bin) when is_binary(Bin) ->
-    with_stream(Fd, fun(OutputStream) -> 
-        ok = couch_stream:set_min_buffer(OutputStream, size(Bin)),
-        {ok, StreamPointer} = couch_stream:write(OutputStream, Bin),
-        {Fd, StreamPointer, size(Bin)}
+    with_stream(Fd, fun(OutputStream) ->
+        couch_stream:write(OutputStream, Bin)
     end);
                  
 flush_binary(Fd, {StreamFun, undefined}) when is_function(StreamFun) ->
-    % max_attachment_chunk_size control the max we buffer in memory
-    MaxChunkSize = list_to_integer(couch_config:get("couchdb", 
-        "max_attachment_chunk_size","4294967296")),
     with_stream(Fd, fun(OutputStream) -> 
+        io:format("OutputStream:~p~n", [OutputStream]),
         % StreamFun(MaxChunkSize, WriterFun) must call WriterFun
-        % once for each chunk of the attachment.
-        WriterFun = make_writer_fun(OutputStream),
-        {ok, {TotalLength, NewStreamPointer}} = 
-            StreamFun(MaxChunkSize, WriterFun, {0, nil}),
-        {Fd, NewStreamPointer, TotalLength}
+        % once for each chunk of the attachment,
+        StreamFun(4096,
+            % WriterFun({Length, Binary}, State)
+            % WriterFun({0, _Footers}, State)
+            % Called with Length == 0 on the last time.
+            % WriterFun returns NewState.
+            fun({0, _Footers}, _) ->
+                ok;
+            ({_Length, Bin}, _) ->
+                couch_stream:write(OutputStream, Bin)
+            end, ok)
     end);
              
 flush_binary(Fd, {Fun, Len}) when is_function(Fun) ->
     with_stream(Fd, fun(OutputStream) -> 
-        ok = couch_stream:set_min_buffer(OutputStream, Len),
-        {ok, StreamPointer} =
-            write_streamed_attachment(OutputStream, Fun, Len, nil),
-        {Fd, StreamPointer, Len}
+        write_streamed_attachment(OutputStream, Fun, Len)
     end).
             
 with_stream(Fd, Fun) ->
     {ok, OutputStream} = couch_stream:open(Fd),
-    Result = Fun(OutputStream),
-    couch_stream:close(OutputStream),
-    Result.
-
-make_writer_fun(Stream) ->
-    % WriterFun({Length, Binary}, State)
-    % WriterFun({0, _Footers}, State)
-    % Called with Length == 0 on the last time.
-    % WriterFun returns NewState.
-    fun
-        ({0, _Footers}, {FinalLen, SpFin}) ->
-            % last block, return the final tuple
-            {ok, {FinalLen, SpFin}};
-        ({Length, Bin}, {Total, nil}) ->
-            % save StreamPointer 
-            ok = couch_stream:set_min_buffer(Stream, Length),
-            {ok, StreamPointer} = couch_stream:write(Stream, Bin),
-            {Total+Length, StreamPointer};
-        ({Length, Bin}, {Total, SpAcc}) ->
-            % write the Bin to disk 
-            ok = couch_stream:set_min_buffer(Stream, Length),
-            {ok, _Sp} = couch_stream:write(Stream, Bin),
-            {Total+Length, SpAcc}
-    end.
+    Fun(OutputStream),
+    {StreamInfo, Len} = couch_stream:close(OutputStream),
+    {Fd, StreamInfo, Len}.
+
     
-write_streamed_attachment(_Stream, _F, 0, SpAcc) ->
-    {ok, SpAcc};
-write_streamed_attachment(Stream, F, LenLeft, nil) ->
-    Bin = F(),
-    TruncatedBin = check_bin_length(LenLeft, Bin),
-    {ok, SpAcc} = couch_stream:write(Stream, TruncatedBin),
-    write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc);
-write_streamed_attachment(Stream, F, LenLeft, SpAcc) ->
+write_streamed_attachment(_Stream, _F, 0) ->
+    ok;
+write_streamed_attachment(Stream, F, LenLeft) ->
     Bin = F(),
-    TruncatedBin = check_bin_length(LenLeft, Bin),
-    {ok, _} = couch_stream:write(Stream, TruncatedBin),
-    write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc).
+    ok = couch_stream:write(Stream, check_bin_length(LenLeft, Bin)),
+    write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
 
 %% on rare occasions ibrowse seems to process a chunked response incorrectly
 %% and include an extra "\r" in the last chunk.  This code ensures that we 
@@ -865,14 +823,13 @@
     [{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}].
 
     
-make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
+make_doc(#db{fd=Fd}, Id, Deleted, Bp, RevisionPath) ->
     {BodyData, BinValues} =
     case Bp of
     nil ->
         {[], []};
     _ ->
-        {ok, {BodyData0, BinValues0}} =
-            couch_stream:read_term( Db#db.summary_stream, Bp),
+        {ok, {BodyData0, BinValues0}} =couch_file:pread_term(Fd, Bp),
         {BodyData0,
             [{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]}
     end,

Modified: couchdb/branches/tail_header/src/couchdb/couch_db.hrl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_db.hrl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_db.hrl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_db.hrl Sun May 17 21:31:17 2009
@@ -115,7 +115,7 @@
 -record(db_header,
     {disk_version = ?LATEST_DISK_VERSION,  
      update_seq = 0,
-     summary_stream_state = nil,
+     unused,
      fulldocinfo_by_id_btree_state = nil,
      docinfo_by_seq_btree_state = nil,
      local_docs_btree_state = nil,
@@ -133,7 +133,7 @@
     fd,
     fd_ref_counter,
     header = #db_header{},
-    summary_stream,
+    committed_update_seq,
     fulldocinfo_by_id_btree,
     docinfo_by_seq_btree,
     local_docs_btree,

Modified: couchdb/branches/tail_header/src/couchdb/couch_db_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_db_updater.erl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_db_updater.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_db_updater.erl Sun May 17 21:31:17 2009
@@ -19,18 +19,18 @@
 
 -include("couch_db.hrl").
 
--define(HEADER_SIG, <<$g, $m, $k, 0>>).
 
 init({MainPid, DbName, Filepath, Fd, Options}) ->
+    io:format("Init fd:~p~n", [Fd]),
     case lists:member(create, Options) of
     true ->
         % create a new header and writes it to the file
         Header =  #db_header{},
-        ok = couch_file:write_header(Fd, ?HEADER_SIG, Header),
+        ok = couch_file:write_header(Fd, Header),
         % delete any old compaction files that might be hanging around
         file:delete(Filepath ++ ".compact");
     false ->
-        {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG)
+        {ok, Header} = couch_file:read_header(Fd)
     end,
     
     Db = init_db(DbName, Filepath, Fd, Header),
@@ -56,8 +56,9 @@
     end;
 handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
     {reply, ok, Db}; % no data waiting, return ok immediately
-handle_call(full_commit, _From, Db) ->
-    {reply, ok, commit_data(Db)}; % commit the data and return ok
+handle_call(full_commit, _From,  #db{fd=Fd,update_seq=Seq}=Db) ->
+    ok = couch_file:sync(Fd),
+    {reply, ok, Db#db{waiting_delayed_commit=nil,committed_update_seq=Seq}}; % commit the data and return ok
 handle_call(increment_update_seq, _From, Db) ->
     Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
     ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
@@ -158,7 +159,7 @@
     end;
 handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
     {ok, NewFd} = couch_file:open(CompactFilepath),
-    {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG),
+    {ok, NewHeader} = couch_file:read_header(NewFd),
     #db{update_seq=NewSeq} = NewDb =
             init_db(Db#db.name, Filepath, NewFd, NewHeader),
     unlink(NewFd),
@@ -190,8 +191,9 @@
         {noreply, Db2}
     end.
 
-handle_info(delayed_commit, Db) ->
-    {noreply, commit_data(Db#db{waiting_delayed_commit=nil})}.
+handle_info(delayed_commit, #db{update_seq=Seq}=Db) ->
+    ok = couch_file:sync(Db#db.fd),
+    {noreply, Db#db{waiting_delayed_commit=nil,committed_update_seq=Seq}}.
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -287,8 +289,6 @@
     _ -> throw({database_disk_version_error, "Incorrect disk header version"})
     end,
     Header = simple_upgrade_record(Header0, #db_header{}),
-    {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
-    ok = couch_stream:set_min_buffer(SummaryStream, 10000),
     Less = fun less_docid/2,
             
     {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
@@ -319,7 +319,6 @@
         fd=Fd,
         fd_ref_counter = RefCntr,
         header=Header,
-        summary_stream = SummaryStream,
         fulldocinfo_by_id_btree = IdBtree,
         docinfo_by_seq_btree = SeqBtree,
         local_docs_btree = LocalDocsBtree,
@@ -333,8 +332,7 @@
         }.
 
 
-close_db(#db{ fd_ref_counter = RefCntr, summary_stream = SummaryStream}) ->
-    couch_stream:close(SummaryStream),
+close_db(#db{fd_ref_counter = RefCntr}) ->
     couch_ref_counter:drop(RefCntr).
     
 
@@ -387,7 +385,7 @@
                     ?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []),
                     throw(retry)
                 end,
-                {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
+                {ok, NewSummaryPointer} = couch_file:append_term(Fd, {Doc#doc.body, Bins}),
                 {IsDeleted, NewSummaryPointer, UpdateSeq};
             _ ->
                 Value
@@ -553,7 +551,6 @@
 commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
     Header2 = Header#db_header{
         update_seq = Db#db.update_seq,
-        summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
         docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
         fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
         local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
@@ -563,53 +560,56 @@
     if Header == Header2 ->
         Db;
     Delay and (Db#db.waiting_delayed_commit == nil) ->
+        ok = couch_file:write_header(Fd, Header2),
         Db#db{waiting_delayed_commit=
                 erlang:send_after(1000, self(), delayed_commit)};
     Delay ->
-        Db;
+        ok = couch_file:write_header(Fd, Header2),
+        Db#db{header=Header2};
     true ->
-        if Db#db.waiting_delayed_commit /= nil ->
+        if not is_atom(Db#db.waiting_delayed_commit) ->
             case erlang:cancel_timer(Db#db.waiting_delayed_commit) of
             false -> receive delayed_commit -> ok after 0 -> ok end;
             _ -> ok
             end;
         true -> ok
         end,
-        ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
-        Db#db{waiting_delayed_commit=nil,header=Header2}
+        ok = couch_file:write_header(Fd, Header2),
+        ok = couch_file:sync(Fd),
+        Db#db{waiting_delayed_commit=nil,header=Header2,committed_update_seq=Db#db.update_seq}
     end.
 
-copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
-    {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
+copy_raw_doc(SrcFd, SrcSp, DestFd) ->
+    {ok, {BodyData, BinInfos}} = couch_file:pread_term(SrcFd, SrcSp),
     % copy the bin values
     NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
-        {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
+        {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
         {Name, {Type, NewBinSp, Len}}
         end, BinInfos),
     % now write the document summary
-    {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}),
+    {ok, Sp} = couch_file:append_term(DestFd, {BodyData, NewBinInfos}),
     Sp.
 
-copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
+copy_rev_tree(_SrcFd, _DestFd, []) ->
     [];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{Start, Tree} | RestTree]) ->
+copy_rev_tree(SrcFd, DestFd, [{Start, Tree} | RestTree]) ->
     % root nner node, only copy info/data from leaf nodes
-    [Tree2] = copy_rev_tree(SrcFd, DestFd, DestStream, [Tree]),
-    [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
+    [Tree2] = copy_rev_tree(SrcFd, DestFd, [Tree]),
+    [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, RestTree)];
+copy_rev_tree(SrcFd, DestFd, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
     % This is a leaf node, copy it over
-    NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
-    [{RevId, {IsDel, NewSp, Seq}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
+    NewSp = copy_raw_doc(SrcFd, Sp, DestFd),
+    [{RevId, {IsDel, NewSp, Seq}, []} | copy_rev_tree(SrcFd, DestFd, RestTree)];
+copy_rev_tree(SrcFd, DestFd, [{RevId, _, SubTree} | RestTree]) ->
     % inner node, only copy info/data from leaf nodes
-    [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)].
+    [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, SubTree)} | copy_rev_tree(SrcFd, DestFd, RestTree)].
     
-copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq, Retry) ->
+copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
     Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
     LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
     NewFullDocInfos0 = lists:map(
         fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
-            Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
+            Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, RevTree)}
         end, LookupResults),
     NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos0),
     NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos],
@@ -642,7 +642,7 @@
         if TotalCopied rem 1000 == 0 ->
             NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
             if TotalCopied rem 10000 == 0 ->
-                {ok, {commit_data(NewDb2#db{update_seq=Seq}), [], TotalCopied + 1}};
+                {ok, {commit_data(NewDb2#db{update_seq=Seq}, true), [], TotalCopied + 1}};
             true ->
                 {ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}}
             end;
@@ -668,7 +668,7 @@
         NewDb4 = NewDb3
     end,
     
-    commit_data(NewDb4#db{update_seq=Db#db.update_seq}).
+    commit_data(NewDb4#db{update_seq=Db#db.update_seq}, true).
 
 start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
     CompactFile = Filepath ++ ".compact",
@@ -677,16 +677,16 @@
     {ok, Fd} ->
         couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>),
         Retry = true,
-        {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
+        {ok, Header} = couch_file:read_header(Fd);
     {error, enoent} ->
         couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
         {ok, Fd} = couch_file:open(CompactFile, [create]),
         Retry = false,
-        ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{})
+        ok = couch_file:write_header(Fd, Header=#db_header{})
     end,
     NewDb = init_db(Name, CompactFile, Fd, Header),
     unlink(Fd),
-    NewDb2 = copy_compact(Db, NewDb, Retry),
+    NewDb2 = copy_compact(Db, NewDb#db{waiting_delayed_commit=never}, Retry),
     
     gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}),
     close_db(NewDb2).

Modified: couchdb/branches/tail_header/src/couchdb/couch_doc.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_doc.erl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_doc.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_doc.erl Sun May 17 21:31:17 2009
@@ -250,13 +250,9 @@
 
 
 bin_foldl(Bin, Fun, Acc) when is_binary(Bin) ->
-    case Fun(Bin, Acc) of
-        {ok, Acc2} -> {ok, Acc2};
-        {done, Acc2} -> {ok, Acc2}
-    end;
-bin_foldl({Fd, Sp, Len}, Fun, Acc) ->
-    {ok, Acc2, _Sp2} = couch_stream:foldl(Fd, Sp, Len, Fun, Acc),
-    {ok, Acc2}.
+    Fun(Bin, Acc);
+bin_foldl({Fd, Sp, _Len}, Fun, Acc) ->
+    couch_stream:foldl(Fd, Sp, Fun, Acc).
 
 bin_size(Bin) when is_binary(Bin) ->
     size(Bin);
@@ -265,9 +261,8 @@
 
 bin_to_binary(Bin) when is_binary(Bin) ->
     Bin;
-bin_to_binary({Fd, Sp, Len}) ->
-    {ok, Bin, _Sp2} = couch_stream:read(Fd, Sp, Len),
-    Bin.
+bin_to_binary({Fd, Sp, _Len}) ->
+    couch_stream:foldl(Fd, Sp, fun(Bin, Acc) -> [Bin|Acc] end, []).
 
 get_validate_doc_fun(#doc{body={Props}}) ->
     Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),

Modified: couchdb/branches/tail_header/src/couchdb/couch_file.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_file.erl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_file.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_file.erl Sun May 17 21:31:17 2009
@@ -15,10 +15,11 @@
 
 -include("couch_db.hrl").
 
--define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
+-define(SIZE_BLOCK, 4096).
 
--export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]).
--export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]).
+-export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2]).
+-export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]).
+-export([pread_binary/2, read_header/1, truncate/2]).
 -export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
 
 %%----------------------------------------------------------------------
@@ -52,39 +53,6 @@
 
 
 %%----------------------------------------------------------------------
-%% Args:    Pos is the offset from the beginning of the file, Bytes is
-%%  is the number of bytes to read.
-%% Returns: {ok, Binary} where Binary is a binary data from disk
-%%  or {error, Reason}.
-%%----------------------------------------------------------------------
-
-pread(Fd, Pos, Bytes) when Bytes > 0 ->
-    gen_server:call(Fd, {pread, Pos, Bytes}, infinity).
-
-
-%%----------------------------------------------------------------------
-%% Args:    Pos is the offset from the beginning of the file, Bin is
-%%  is the binary to write
-%% Returns: ok
-%%  or {error, Reason}.
-%%----------------------------------------------------------------------
-
-pwrite(Fd, Pos, Bin) ->
-    gen_server:call(Fd, {pwrite, Pos, Bin}, infinity).
-
-%%----------------------------------------------------------------------
-%% Purpose: To append a segment of zeros to the end of the file.
-%% Args:    Bytes is the number of bytes to append to the file.
-%% Returns: {ok, Pos} where Pos is the file offset to the beginning of
-%%  the new segments.
-%%  or {error, Reason}.
-%%----------------------------------------------------------------------
-
-expand(Fd, Bytes) when Bytes > 0 ->
-    gen_server:call(Fd, {expand, Bytes}, infinity).
-
-
-%%----------------------------------------------------------------------
 %% Purpose: To append an Erlang term to the end of the file.
 %% Args:    Erlang term to serialize and append to the file.
 %% Returns: {ok, Pos} where Pos is the file offset to the beginning the
@@ -105,7 +73,9 @@
 %%----------------------------------------------------------------------
     
 append_binary(Fd, Bin) ->
-    gen_server:call(Fd, {append_bin, Bin}, infinity).
+    Size = iolist_size(Bin),
+    SizePrependedBin = iolist_to_binary([<<Size:32/integer>>, Bin]),
+    gen_server:call(Fd, {append_bin, SizePrependedBin}, infinity).
 
 
 %%----------------------------------------------------------------------
@@ -119,6 +89,7 @@
     {ok, Bin} = pread_binary(Fd, Pos),
     {ok, binary_to_term(Bin)}.
 
+
 %%----------------------------------------------------------------------
 %% Purpose: Reads a binrary from a file that was written with append_binary
 %% Args:    Pos, the offset into the file where the term is serialized.
@@ -127,8 +98,22 @@
 %%----------------------------------------------------------------------
 
 pread_binary(Fd, Pos) ->
-    gen_server:call(Fd, {pread_bin, Pos}, infinity).
+    {ok, L} = pread_iolist(Fd, Pos),
+    {ok, iolist_to_binary(L)}.
 
+pread_iolist(Fd, Pos) ->
+    {ok, LenIolist, NextPos} =read_raw_iolist(Fd, Pos, 4),
+    <<Len:32/integer>> = iolist_to_binary(LenIolist),
+    {ok, Iolist, _} = read_raw_iolist(Fd, NextPos, Len),
+    {ok, Iolist}.
+
+read_raw_iolist(Fd, Pos, Len) when (Pos rem ?SIZE_BLOCK) == 0 ->
+    read_raw_iolist(Fd, Pos + 1, Len);
+read_raw_iolist(Fd, Pos, Len) ->
+    BlockOffset = Pos rem ?SIZE_BLOCK,
+    TotalBytes = calculate_total_read_len(BlockOffset, Len),
+    {ok, <<RawBin:TotalBytes/binary>>} = gen_server:call(Fd, {pread, Pos, TotalBytes}, infinity),
+    {ok, remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes}.
 
 %%----------------------------------------------------------------------
 %% Purpose: The length of a file, in bytes.
@@ -167,99 +152,22 @@
     catch unlink(Fd),
     Result.
 
-
-write_header(Fd, Prefix, Data) ->
-    TermBin = term_to_binary(Data),
-    % the size of all the bytes written to the header, including the md5 signature (16 bytes)
-    FilledSize = size(Prefix) + size(TermBin) + 16,
-    {TermBin2, FilledSize2} =
-    case FilledSize > ?HEADER_SIZE of
-    true ->
-        % too big!
-        {ok, Pos} = append_binary(Fd, TermBin),
-        PtrBin = term_to_binary({pointer_to_header_data, Pos}),
-        {PtrBin, size(Prefix) + size(PtrBin) + 16};
-    false ->
-        {TermBin, FilledSize}
-    end,
-    ok = sync(Fd),
-    % pad out the header with zeros, then take the md5 hash
-    PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
-    Sig = erlang:md5([TermBin2, PadZeros]),
-    % now we assemble the final header binary and write to disk
-    WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
-    ?HEADER_SIZE = size(WriteBin), % sanity check
-    DblWriteBin = [WriteBin, WriteBin],
-    ok = pwrite(Fd, 0, DblWriteBin),
-    ok = sync(Fd).
-
-
-read_header(Fd, Prefix) ->
-    {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)),
-    <<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
-    Result =
-    % read the first header
-    case extract_header(Prefix, Bin1) of
-    {ok, Header1} ->
-        case extract_header(Prefix, Bin2) of
-        {ok, Header2} ->
-            case Header1 == Header2 of
-            true ->
-                % Everything is completely normal!
-                {ok, Header1};
-            false ->
-                % To get here we must have two different header versions with signatures intact.
-                % It's weird but possible (a commit failure right at the 2k boundary). Log it and take the first.
-                ?LOG_INFO("Header version differences.~nPrimary Header: ~p~nSecondary Header: ~p", [Header1, Header2]),
-                {ok, Header1}
-            end;
-        Error ->
-            % error reading second header. It's ok, but log it.
-            ?LOG_INFO("Secondary header corruption (error: ~p). Using primary header.", [Error]),
-            {ok, Header1}
-        end;
-    Error ->
-        % error reading primary header
-        case extract_header(Prefix, Bin2) of
-        {ok, Header2} ->
-            % log corrupt primary header. It's ok since the secondary is still good.
-            ?LOG_INFO("Primary header corruption (error: ~p). Using secondary header.", [Error]),
-            {ok, Header2};
-        _ ->
-            % error reading secondary header too
-            % return the error, no need to log anything as the caller will be responsible for dealing with the error.
-            Error
-        end
-    end,
-    case Result of
-    {ok, {pointer_to_header_data, Ptr}} ->
-        pread_term(Fd, Ptr);
-    _ ->
-        Result
+read_header(Fd) ->
+    case gen_server:call(Fd, find_header, infinity) of
+    {ok, Bin} ->
+        {ok, binary_to_term(Bin)};
+    Else ->
+        Else
     end.
     
-extract_header(Prefix, Bin) ->
-    SizeOfPrefix = size(Prefix),
-    SizeOfTermBin = ?HEADER_SIZE -
-                    SizeOfPrefix -
-                    16,     % md5 sig
-
-    <<HeaderPrefix:SizeOfPrefix/binary, TermBin:SizeOfTermBin/binary, Sig:16/binary>> = Bin,
-
-    % check the header prefix
-    case HeaderPrefix of
-    Prefix ->
-        % check the integrity signature
-        case erlang:md5(TermBin) == Sig of
-        true ->
-            Header = binary_to_term(TermBin),
-            {ok, Header};
-        false ->
-            header_corrupt
-        end;
-    _ ->
-        unknown_header_type
-    end.
+write_header(Fd, Data) ->
+    Bin = term_to_binary(Data),
+    Md5 = erlang:md5(Bin),
+    % now we assemble the final header binary and write to disk
+    FinalBin = <<Md5/binary, Bin/binary>>,
+    gen_server:call(Fd, {write_header, FinalBin}, infinity).
+    
+
 
 
 init_status_error(ReturnPid, Ref, Error) ->
@@ -319,11 +227,6 @@
 
 handle_call({pread, Pos, Bytes}, _From, Fd) ->
     {reply, file:pread(Fd, Pos, Bytes), Fd};
-handle_call({pwrite, Pos, Bin}, _From, Fd) ->
-    {reply, file:pwrite(Fd, Pos, Bin), Fd};
-handle_call({expand, Num}, _From, Fd) ->
-    {ok, Pos} = file:position(Fd, eof),
-    {reply, {file:pwrite(Fd, Pos + Num - 1, <<0>>), Pos}, Fd};
 handle_call(bytes, _From, Fd) ->
     {reply, file:position(Fd, eof), Fd};
 handle_call(sync, _From, Fd) ->
@@ -332,16 +235,33 @@
     {ok, Pos} = file:position(Fd, Pos),
     {reply, file:truncate(Fd), Fd};
 handle_call({append_bin, Bin}, _From, Fd) ->
-    Len = size(Bin),
-    Bin2 = <<Len:32, Bin/binary>>,
     {ok, Pos} = file:position(Fd, eof),
-    {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd};
-handle_call({pread_bin, Pos}, _From, Fd) ->
-    {ok, <<TermLen:32>>} = file:pread(Fd, Pos, 4),
-    {ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
-    {reply, {ok, Bin}, Fd}.
-
-
+    Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
+    case file:pwrite(Fd, Pos, Blocks) of
+    ok ->
+        {reply, {ok, Pos}, Fd};
+    Error ->
+        {reply, Error, Fd}
+    end;
+handle_call({write_header, Bin}, _From, Fd) ->
+    {ok, Pos} = file:position(Fd, eof),
+    BinSize = size(Bin),
+    case Pos rem ?SIZE_BLOCK of
+    0 ->
+        io:format("Writing header at block:~p~n", [(Pos div ?SIZE_BLOCK)]),
+        Padding = <<>>;
+    BlockOffset ->
+        io:format("Writing header at block:~p~n", [(Pos div ?SIZE_BLOCK) + 1]),
+        Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>>
+    end,
+    FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(1, Bin)],
+    {reply, file:pwrite(Fd, Pos, FinalBin), Fd};
+handle_call(find_header, _From, Fd) ->
+    {ok, Pos} = file:position(Fd, eof),
+    {reply, find_header(Fd, Pos div ?SIZE_BLOCK), Fd}.
+    
+    
+    
 handle_cast(close, Fd) ->
     {stop,normal,Fd}.
 
@@ -351,3 +271,64 @@
 
 handle_info({'EXIT', _, Reason}, Fd) ->
     {stop, Reason, Fd}.
+
+
+find_header(_Fd, -1) ->
+    no_valid_header;
+find_header(Fd, Block) ->
+    case (catch load_header(Fd, Block)) of
+    {ok, Bin} ->
+        io:format("Found header at block:~p~n", [Block]),
+        {ok, Bin};
+    _Error ->
+        find_header(Fd, Block -1)
+    end.
+    
+load_header(Fd, Block) ->
+    {ok, <<1>>} = file:pread(Fd, Block*?SIZE_BLOCK, 1),
+    {ok, <<HeaderLen:32/integer>>} = file:pread(Fd, (Block*?SIZE_BLOCK) + 1, 4),
+    TotalBytes = calculate_total_read_len(1, HeaderLen),
+    {ok, <<RawBin:TotalBytes/binary>>} = 
+            file:pread(Fd, (Block*?SIZE_BLOCK) + 5, TotalBytes),
+    io:format("Foo:~p~n", [RawBin]),
+    <<Md5Sig:16/binary, HeaderBin/binary>> = 
+        iolist_to_binary(remove_block_prefixes(1, RawBin)),
+    Md5Sig = erlang:md5(HeaderBin),
+    {ok, HeaderBin}.
+
+
+calculate_total_read_len(BlockOffset, FinalLen) ->
+    case ?SIZE_BLOCK - BlockOffset of
+    BlockLeft when BlockLeft >= FinalLen ->
+        FinalLen;
+    BlockLeft ->
+        FinalLen + ((FinalLen - BlockLeft) div (?SIZE_BLOCK -1)) +
+            if ((FinalLen - BlockLeft) rem (?SIZE_BLOCK -1)) == 0 -> 0;
+                true -> 1 end
+    end.
+
+remove_block_prefixes(_BlockOffset, <<>>) ->
+    [];
+remove_block_prefixes(0, <<_BlockPrefix,Rest/binary>>) ->
+    remove_block_prefixes(1, Rest);
+remove_block_prefixes(BlockOffset, Bin) ->
+    BlockBytesAvailable = ?SIZE_BLOCK - BlockOffset,
+    case size(Bin) of
+    Size when Size > BlockBytesAvailable ->
+        <<DataBlock:BlockBytesAvailable/binary,Rest/binary>> = Bin,
+        [DataBlock | remove_block_prefixes(0, Rest)];
+    _Size ->
+        [Bin]
+    end.
+
+make_blocks(_BlockOffset, <<>>) ->
+    [];
+make_blocks(0, Bin) ->
+    [<<0>> | make_blocks(1, Bin)];
+make_blocks(BlockOffset, Bin) when size(Bin) =< (?SIZE_BLOCK - BlockOffset) ->
+    [Bin];
+make_blocks(BlockOffset, Bin) ->
+    BlockBytes = (?SIZE_BLOCK - BlockOffset),
+    <<BlockBin:BlockBytes/binary, Rest/binary>> = Bin,
+    [BlockBin | make_blocks(0, Rest)].
+

Modified: couchdb/branches/tail_header/src/couchdb/couch_httpd_db.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_httpd_db.erl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_httpd_db.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_httpd_db.erl Sun May 17 21:31:17 2009
@@ -720,12 +720,7 @@
             % {"Content-Length", integer_to_list(couch_doc:bin_size(Bin))}
             ]),
         couch_doc:bin_foldl(Bin,
-            fun(BinSegment, []) ->
-                send_chunk(Resp, BinSegment),
-                {ok, []}
-            end,
-            []
-        ),
+                fun(BinSegment, _) -> send_chunk(Resp, BinSegment) end,[]),
         send_chunk(Resp, "")
     end;
 

Modified: couchdb/branches/tail_header/src/couchdb/couch_stream.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_stream.erl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_stream.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_stream.erl Sun May 17 21:31:17 2009
@@ -13,144 +13,96 @@
 -module(couch_stream).
 -behaviour(gen_server).
 
--export([test/1]).
--export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1, foldl/5]).
--export([copy/4, copy_to_new_stream/4]).
--export([ensure_buffer/2, set_min_buffer/2]).
+-export([test/0]).
+-export([open/1, close/1, write/2, foldl/4]).
+-export([copy_to_new_stream/3]).
 -export([init/1, terminate/2, handle_call/3]).
 -export([handle_cast/2,code_change/3,handle_info/2]).
 
 -include("couch_db.hrl").
 
--define(FILE_POINTER_BYTES, 8).
--define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)).
-
--define(STREAM_OFFSET_BYTES, 4).
--define(STREAM_OFFSET_BITS, 8*(?STREAM_OFFSET_BYTES)).
-
--define(HUGE_CHUNK, 1000000000). % Huge chuck size when reading all in one go
-
--define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
-
-
--record(write_stream,
-    {fd = 0,
-    current_pos = 0,
-    bytes_remaining = 0,
-    next_alloc = 0,
-    min_alloc = 16#00010000
-    }).
-
 -record(stream,
-    {
-    pid,
-    fd
+    {fd = 0,
+    written_pointers=[],
+    buffer_list = [],
+    buffer_len = 0,
+    max_buffer = 4096,
+    written_len = 0
     }).
 
 
 %%% Interface functions %%%
 
 open(Fd) ->
-    open(nil, Fd).
+    gen_server:start_link(couch_stream, Fd, []).
 
-open(nil, Fd) ->
-    open({0,0}, Fd);
-open(State, Fd) ->
-    {ok, Pid} = gen_server:start_link(couch_stream, {State, Fd}, []),
-    {ok, #stream{pid = Pid, fd = Fd}}.
-
-close(#stream{pid = Pid, fd = _Fd}) ->
+close(Pid) ->
     gen_server:call(Pid, close, infinity).
 
-get_state(#stream{pid = Pid, fd = _Fd}) ->
-    gen_server:call(Pid, get_state, infinity).
-
-ensure_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
-    gen_server:call(Pid, {ensure_buffer, Bytes}).
+copy_to_new_stream(Fd, PosList, DestFd) ->
+    {ok, Dest} = open(DestFd),
+    foldl(Fd, PosList,
+        fun(Bin, _) ->
+            ok = write(Dest, Bin)
+        end, ok),
+    close(Dest).
 
-set_min_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
-    gen_server:call(Pid, {set_min_buffer, Bytes}).
 
-read(#stream{pid = _Pid, fd = Fd}, Sp, Num) ->
-    read(Fd, Sp, Num);
-read(Fd, Sp, Num) ->
-    {ok, RevBin, Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> {ok, [Bin | Acc]} end, []),
-    Bin = list_to_binary(lists:reverse(RevBin)),
-    {ok, Bin, Sp2}.
+foldl(_Fd, [], _Fun, Acc) ->
+    Acc;
+foldl(Fd, [Pos|Rest], Fun, Acc) ->
+    {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+    foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
 
-copy_to_new_stream(Src, Sp, Len, DestFd) ->
-    {ok, Dest} = open(DestFd),
-    ok = set_min_buffer(Dest, 0),
-    {ok, NewSp} = copy(Src, Sp, Len, Dest),
-    close(Dest),
-    {ok, NewSp}.
-
-copy(#stream{pid = _Pid, fd = Fd}, Sp, Len, DestStream) ->
-    copy(Fd, Sp, Len, DestStream);
-copy(Fd, Sp, Len, DestStream) ->
-    {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Len, ?HUGE_CHUNK,
-        fun(Bin, AccPointer) ->
-            {ok, NewPointer} = write(DestStream, Bin),
-            {ok, if AccPointer == null -> NewPointer; true -> AccPointer end}
-        end,
-        null),
-    {ok, NewSp}.
-
-foldl(#stream{pid = _Pid, fd = Fd}, Sp, Num, Fun, Acc) ->
-    foldl(Fd, Sp, Num, Fun, Acc);
-foldl(Fd, Sp, Num, Fun, Acc) ->
-    {ok, _Acc, _Sp} = stream_data(Fd, Sp, Num, ?DEFAULT_STREAM_CHUNK, Fun, Acc).
-
-read_term(#stream{pid = _Pid, fd = Fd}, Sp) ->
-    read_term(Fd, Sp);
-read_term(Fd, Sp) ->
-    {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
-        = read(Fd, Sp, ?STREAM_OFFSET_BYTES),
-    {ok, Bin, _Sp3} = read(Fd, Sp2, TermLen),
-    {ok, binary_to_term(Bin)}.
-
-write_term(Stream, Term) ->
-    Bin = term_to_binary(Term),
-    Size = size(Bin),
-    Bin2 = <<Size:(?STREAM_OFFSET_BITS), Bin/binary>>,
-    write(Stream, Bin2).
-
-write(#stream{}, <<>>) ->
-    {ok, {0,0}};
-write(#stream{pid = Pid}, Bin) when is_binary(Bin) ->
+write(_Pid, <<>>) ->
+    ok;
+write(Pid, Bin) ->
     gen_server:call(Pid, {write, Bin}, infinity).
 
 
-init({{Pos, BytesRemaining}, Fd}) ->
-    {ok, #write_stream
-        {fd = Fd,
-        current_pos = Pos,
-        bytes_remaining = BytesRemaining
-        }}.
+init(Fd) ->
+    {ok, #stream{fd = Fd}}.
 
 terminate(_Reason, _Stream) ->
     ok.
 
-handle_call(get_state, _From, Stream) ->
-    #write_stream{current_pos = Pos, bytes_remaining = BytesRemaining} = Stream,
-    {reply, {Pos, BytesRemaining}, Stream};
-handle_call({set_min_buffer, MinBuffer}, _From, Stream) ->
-    {reply, ok, Stream#write_stream{min_alloc = MinBuffer}};
-% set next_alloc if we need more room
-handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) ->
-    #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream,
-    case BytesRemainingInCurrentBuffer < BufferSizeRequested of
-        true ->  NextAlloc = BufferSizeRequested - BytesRemainingInCurrentBuffer;
-        false -> NextAlloc = 0 % enough room in current segment
-    end,
-    {reply, ok, Stream#write_stream{next_alloc = NextAlloc}};
 handle_call({write, Bin}, _From, Stream) ->
-    % ensure init is called first so we can get a pointer to the begining of the binary
-    {ok, Sp, Stream2} = write_data(Stream, Bin),
-    {reply, {ok, Sp}, Stream2};
+    BinSize = iolist_size(Bin),
+    #stream{
+        fd = Fd,
+        written_len = WrittenLen,
+        written_pointers = Written,
+        buffer_len = BufferLen,
+        buffer_list = Buffer,
+        max_buffer = Max} = Stream,
+    if BinSize + BufferLen > Max ->
+        {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer, Bin)),
+        {reply, ok, Stream#stream{
+                        written_len=WrittenLen + BufferLen + BinSize,
+                        written_pointers=[Pos|Written],
+                        buffer_list=[],
+                        buffer_len=0}};
+    true ->
+        {reply, ok, Stream#stream{
+                        buffer_list=[Bin|Buffer],
+                        buffer_len=BufferLen + BinSize}}
+    end;
 handle_call(close, _From, Stream) ->
-    #write_stream{current_pos=Pos, bytes_remaining = BytesRemaining} = Stream,
-    {stop, normal, {ok, {Pos, BytesRemaining}}, Stream}.
+    #stream{
+        fd = Fd,
+        written_len = WrittenLen,
+        written_pointers = Written,
+        buffer_len = BufferLen,
+        buffer_list = Buffer} = Stream,
+    
+    case Buffer of
+    [] ->
+        Result = {Written, WrittenLen};
+    _ ->
+        {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer)),
+        Result = {[Pos|Written], WrittenLen + BufferLen}
+    end,
+    {stop, normal, Result, Stream}.
 
 handle_cast(_Msg, State) ->
     {noreply,State}.
@@ -161,100 +113,40 @@
 handle_info(_Info, State) ->
     {noreply, State}.
 
-%%% Internal function %%%
-
-stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
-    {ok, Acc, Sp};
-stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
-    {ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>}
-        = couch_file:pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
-    Sp = {NextPos, NextOffset},
-    % Check NextPos is past current Pos (this is always true in a stream)
-    % Guards against potential infinite loops caused by corruption.
-    case NextPos > Pos of
-        true -> ok;
-        false -> throw({error, stream_corruption})
-    end,
-    stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
-stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
-    ReadAmount = lists:min([MaxChunk, Num, Offset]),
-    {ok, Bin} = couch_file:pread(Fd, Pos, ReadAmount),
-    Sp = {Pos + ReadAmount, Offset - ReadAmount},
-    case Fun(Bin, Acc) of
-    {ok, Acc2} ->
-        stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Acc2);
-    {stop, Acc2} ->
-        {ok, Acc2, Sp}
-    end.
-
-write_data(Stream, <<>>) ->
-    {ok, {0,0}, Stream};
-write_data(#write_stream{bytes_remaining=0} = Stream, Bin) ->
-    #write_stream {
-        fd = Fd,
-        current_pos = CurrentPos,
-        next_alloc = NextAlloc,
-        min_alloc = MinAlloc
-        }= Stream,
-
-    NewSize = lists:max([MinAlloc, NextAlloc, size(Bin)]),
-    % no space in the current segment, must alloc a new segment
-    {ok, NewPos} = couch_file:expand(Fd, NewSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
-
-    case CurrentPos of
-    0 ->
-        ok;
-    _ ->
-        ok = couch_file:pwrite(Fd, CurrentPos, <<NewPos:(?FILE_POINTER_BITS), NewSize:(?STREAM_OFFSET_BITS)>>)
-    end,
-    Stream2 = Stream#write_stream{
-        current_pos=NewPos,
-        bytes_remaining=NewSize,
-        next_alloc=0},
-    write_data(Stream2, Bin);
-write_data(#write_stream{fd=Fd, current_pos=Pos, bytes_remaining=BytesRemaining} = Stream, Bin) ->
-    BytesToWrite = lists:min([size(Bin), BytesRemaining]),
-    {WriteBin, Rest} = split_binary(Bin, BytesToWrite),
-    ok = couch_file:pwrite(Fd, Pos, WriteBin),
-    Stream2 = Stream#write_stream{
-        bytes_remaining=BytesRemaining - BytesToWrite,
-        current_pos=Pos + BytesToWrite
-        },
-    {ok, _, Stream3} = write_data(Stream2, Rest),
-    {ok, {Pos, BytesRemaining}, Stream3}.
-
 
 
 %%% Tests %%%
 
-
-test(Term) ->
-    {ok, Fd} = couch_file:open("foo", [write]),
-    {ok, Stream} = open({0,0}, Fd),
-    {ok, Pos} = write_term(Stream, Term),
-    {ok, Pos2} = write_term(Stream, {Term, Term}),
-    close(Stream),
+read_all(Fd, PosList) ->
+    iolist_to_binary(foldl(Fd, PosList,
+        fun(Bin, Acc) ->
+            [Bin, Acc]
+        end, [])).
+
+
+test() ->
+    {ok, Fd} = couch_file:open("foo", [create,overwrite]),
+    ok = couch_file:write_header(Fd, {howdy, howdy}),
+    Bin = <<"damienkatz">>,
+    {ok, Pos} = couch_file:append_binary(Fd, Bin),
+    {ok, Bin} = couch_file:pread_binary(Fd, Pos),
+    {ok, {howdy, howdy}} = couch_file:read_header(Fd),
+    ok = couch_file:write_header(Fd, {foo, foo}),
+    {ok, {foo, foo}} = couch_file:read_header(Fd),
+    
+    {ok, Stream} = open(Fd),
+    ok = write(Stream, <<"food">>),
+    ok = write(Stream, <<"foob">>),
+    {PosList, 8} = close(Stream),
+    <<"foodfoob">> = read_all(Fd, PosList),
+    {ok, Stream2} = open(Fd),
+    OneBits = <<1:(8*10)>>,
+    ZeroBits = <<0:(8*10)>>,
+    ok = write(Stream2, OneBits),
+    ok = write(Stream2, ZeroBits),
+    {PosList2, 20} = close(Stream2),
+    AllBits = iolist_to_binary([OneBits,ZeroBits]),
+    AllBits = read_all(Fd, PosList2),
     couch_file:close(Fd),
-    {ok, Fd2} = couch_file:open("foo", [read, write]),
-    {ok, Stream2} = open({0,0}, Fd2),
-    {ok, Term1} = read_term(Fd2, Pos),
-    io:format("Term1: ~w ~n",[Term1]),
-    {ok, Term2} = read_term(Fd2, Pos2),
-    io:format("Term2: ~w ~n",[Term2]),
-    {ok, PointerList} = deep_write_test(Stream2, Term, 1000, []),
-    deep_read_test(Fd2, PointerList),
-    close(Stream2),
-    couch_file:close(Fd2).
+    PosList2.
 
-deep_read_test(_Fd, []) ->
-    ok;
-deep_read_test(Fd, [Pointer | RestPointerList]) ->
-    {ok, _Term} = read_term(Fd, Pointer),
-    deep_read_test(Fd, RestPointerList).
-
-deep_write_test(_Stream, _Term, 0, PointerList) ->
-    {ok, PointerList};
-deep_write_test(Stream, Term, N, PointerList) ->
-    WriteList = lists:duplicate(random:uniform(N), Term),
-    {ok, Pointer} = write_term(Stream, WriteList),
-    deep_write_test(Stream, Term, N-1, [Pointer | PointerList]).

Modified: couchdb/branches/tail_header/src/couchdb/couch_view_group.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_view_group.erl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_view_group.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_view_group.erl Sun May 17 21:31:17 2009
@@ -205,7 +205,7 @@
     if CommittedSeq >= Group#group.current_seq ->
         % save the header
         Header = {Group#group.sig, get_index_header_data(Group)},
-        ok = couch_file:write_header(Group#group.fd, <<$r, $c, $k, 0>>, Header),
+        ok = couch_file:write_header(Group#group.fd, Header),
         {noreply, State#group_state{waiting_commit=false}};
     true ->
         % We can't commit the header because the database seq that's fully
@@ -313,7 +313,7 @@
             if ForceReset ->
                 {ok, reset_file(Db, Fd, DbName, Group)};
             true ->
-                case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
+                case (catch couch_file:read_header(Fd)) of
                 {ok, {Sig, HeaderInfo}} ->
                     % sigs match!
                     {ok, init_group(Db, Fd, Group, HeaderInfo)};
@@ -417,7 +417,7 @@
 reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
     ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
     ok = couch_file:truncate(Fd, 0),
-    ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
+    ok = couch_file:write_header(Fd, {Sig, nil}),
     init_group(Db, Fd, reset_group(Group), nil).
 
 delete_index_file(RootDir, DbName, GroupId) ->