You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2009/05/21 21:01:11 UTC

svn commit: r777222 - in /couchdb/branches/tail_header/src/couchdb: couch_db.erl couch_db.hrl couch_db_updater.erl couch_doc.erl couch_file.erl couch_stream.erl couch_view_group.erl

Author: damien
Date: Thu May 21 19:01:10 2009
New Revision: 777222

URL: http://svn.apache.org/viewvc?rev=777222&view=rev
Log:
Added code to serve and upgrade old style databases with zero downtime.

Modified:
    couchdb/branches/tail_header/src/couchdb/couch_db.erl
    couchdb/branches/tail_header/src/couchdb/couch_db.hrl
    couchdb/branches/tail_header/src/couchdb/couch_db_updater.erl
    couchdb/branches/tail_header/src/couchdb/couch_doc.erl
    couchdb/branches/tail_header/src/couchdb/couch_file.erl
    couchdb/branches/tail_header/src/couchdb/couch_stream.erl
    couchdb/branches/tail_header/src/couchdb/couch_view_group.erl

Modified: couchdb/branches/tail_header/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_db.erl?rev=777222&r1=777221&r2=777222&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_db.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_db.erl Thu May 21 19:01:10 2009
@@ -24,7 +24,7 @@
 -export([increment_update_seq/1,get_purge_seq/1,purge_docs/2,get_last_purged/1]).
 -export([start_link/3,open_doc_int/3,set_admins/2,get_admins/1,ensure_full_commit/1]).
 -export([init/1,terminate/2,handle_call/3,handle_cast/2,code_change/3,handle_info/2]).
--export([changes_since/5]).
+-export([changes_since/5,read_doc/2]).
 
 -include("couch_db.hrl").
 
@@ -566,6 +566,11 @@
     % already written to our file, nothing to write
     {Fd, StreamPointer, Len};
   
+flush_binary(Fd, {OtherFd, StreamPointer, Len}) when is_tuple(StreamPointer) ->
+    {NewStreamData, Len} = 
+            couch_stream:old_copy_to_new_stream(OtherFd, StreamPointer, Len, Fd),
+    {Fd, NewStreamData, Len};
+
 flush_binary(Fd, {OtherFd, StreamPointer, Len}) ->
     {NewStreamData, Len} = 
             couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
@@ -811,6 +816,11 @@
         end
     end.
 
+read_doc(Fd, Pos) when is_integer(Pos) ->
+    couch_file:pread_term(Fd, Pos);
+read_doc(Fd, OldStyleStreamPointer) ->
+    couch_stream:old_read_term(Fd, OldStyleStreamPointer).
+
 
 doc_to_tree(#doc{revs={Start, RevIds}}=Doc) ->
     [Tree] = doc_to_tree_simple(Doc, lists:reverse(RevIds)),
@@ -829,7 +839,7 @@
     nil ->
         {[], []};
     _ ->
-        {ok, {BodyData0, BinValues0}} =couch_file:pread_term(Fd, Bp),
+        {ok, {BodyData0, BinValues0}} = read_doc(Fd, Bp),
         {BodyData0,
             [{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]}
     end,

Modified: couchdb/branches/tail_header/src/couchdb/couch_db.hrl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_db.hrl?rev=777222&r1=777221&r2=777222&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_db.hrl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_db.hrl Thu May 21 19:01:10 2009
@@ -109,8 +109,7 @@
 % if the disk revision is incremented, then new upgrade logic will need to be
 % added to couch_db_updater:init_db.
 
--define(DISK_VERSION_0_9, 1).
--define(LATEST_DISK_VERSION, 2).
+-define(LATEST_DISK_VERSION, 3).
 
 -record(db_header,
     {disk_version = ?LATEST_DISK_VERSION,  

Modified: couchdb/branches/tail_header/src/couchdb/couch_db_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_db_updater.erl?rev=777222&r1=777221&r2=777222&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_db_updater.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_db_updater.erl Thu May 21 19:01:10 2009
@@ -29,6 +29,7 @@
         % delete any old compaction files that might be hanging around
         file:delete(Filepath ++ ".compact");
     false ->
+        ok = couch_file:upgrade_old_header(Fd, <<$g, $m, $k, 0>>),
         case couch_config:get("couchdb", "sync_on_open", "true") of
         "true" ->
             ok = couch_file:sync(Fd);
@@ -287,13 +288,16 @@
 less_docid({}, _) -> false; % {} -> special key sorts after all
 less_docid(A, B) -> A < B.
 
+
 init_db(DbName, Filepath, Fd, Header0) ->
     case element(2, Header0) of
-    ?DISK_VERSION_0_9 -> ok; % no problem, all records upgrade on the fly
+    1 -> ok; % 0.9
+    2 -> ok; % post 0.9 and pre 0.10
     ?LATEST_DISK_VERSION -> ok;
     _ -> throw({database_disk_version_error, "Incorrect disk header version"})
     end,
-    Header = simple_upgrade_record(Header0, #db_header{}),
+    Header1 = Header0#db_header{unused = 0}, % used in versions 1 and 2, but not later
+    Header = simple_upgrade_record(Header1, #db_header{}),
     Less = fun less_docid/2,
             
     {ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
@@ -584,12 +588,17 @@
         Db#db{waiting_delayed_commit=nil,header=Header2,committed_update_seq=Db#db.update_seq}
     end.
 
+
 copy_raw_doc(SrcFd, SrcSp, DestFd) ->
-    {ok, {BodyData, BinInfos}} = couch_file:pread_term(SrcFd, SrcSp),
+    {ok, {BodyData, BinInfos}} = couch_db:read_doc(SrcFd, SrcSp),
     % copy the bin values
-    NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
-        {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
-        {Name, {Type, NewBinSp, Len}}
+    NewBinInfos = lists:map(
+        fun({Name, {Type, BinSp, Len}}) when is_tuple(BinSp) orelse BinSp == null ->
+            {NewBinSp, Len} = couch_stream:old_copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
+            {Name, {Type, NewBinSp, Len}};
+        ({Name, {Type, BinSp, Len}}) ->
+            {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
+            {Name, {Type, NewBinSp, Len}}
         end, BinInfos),
     % now write the document summary
     {ok, Sp} = couch_file:append_term(DestFd, {BodyData, NewBinInfos}),

Modified: couchdb/branches/tail_header/src/couchdb/couch_doc.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_doc.erl?rev=777222&r1=777221&r2=777222&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_doc.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_doc.erl Thu May 21 19:01:10 2009
@@ -251,6 +251,8 @@
 
 bin_foldl(Bin, Fun, Acc) when is_binary(Bin) ->
     Fun(Bin, Acc);
+bin_foldl({Fd, Sp, Len}, Fun, Acc) when is_tuple(Sp) orelse Sp == null ->
+    couch_stream:old_foldl(Fd, Sp, Len, Fun, Acc);
 bin_foldl({Fd, Sp, _Len}, Fun, Acc) ->
     couch_stream:foldl(Fd, Sp, Fun, Acc).
 

Modified: couchdb/branches/tail_header/src/couchdb/couch_file.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_file.erl?rev=777222&r1=777221&r2=777222&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_file.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_file.erl Thu May 21 19:01:10 2009
@@ -17,9 +17,14 @@
 
 -define(SIZE_BLOCK, 4096).
 
--export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2]).
+-record(file, {
+    fd,
+    tail_append_begin=0
+    }).
+
+-export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2,old_pread/3]).
 -export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]).
--export([pread_binary/2, read_header/1, truncate/2]).
+-export([pread_binary/2, read_header/1, truncate/2, upgrade_old_header/2]).
 -export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
 
 %%----------------------------------------------------------------------
@@ -61,7 +66,7 @@
 %%----------------------------------------------------------------------
 
 append_term(Fd, Term) ->
-    append_binary(Fd, term_to_binary(Term)).
+    append_binary(Fd, term_to_binary(Term, [compressed])).
 
 
 %%----------------------------------------------------------------------
@@ -84,6 +89,7 @@
 %%  or {error, Reason}.
 %%----------------------------------------------------------------------
 
+    
 pread_term(Fd, Pos) ->
     {ok, Bin} = pread_binary(Fd, Pos),
     {ok, binary_to_term(Bin)}.
@@ -111,8 +117,13 @@
 read_raw_iolist(Fd, Pos, Len) ->
     BlockOffset = Pos rem ?SIZE_BLOCK,
     TotalBytes = calculate_total_read_len(BlockOffset, Len),
-    {ok, <<RawBin:TotalBytes/binary>>} = gen_server:call(Fd, {pread, Pos, TotalBytes}, infinity),
-    {ok, remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes}.
+    {ok, <<RawBin:TotalBytes/binary>>, HasPrefixes} = gen_server:call(Fd, {pread, Pos, TotalBytes}, infinity),
+    if HasPrefixes ->
+        {ok, remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes};
+    true ->
+        <<ReturnBin:Len/binary, _/binary>> = RawBin,
+        {ok, [ReturnBin], Pos + Len}
+    end.
 
 %%----------------------------------------------------------------------
 %% Purpose: The length of a file, in bytes.
@@ -151,6 +162,14 @@
     catch unlink(Fd),
     Result.
 
+old_pread(Fd, Pos, Len) ->
+    {ok, <<RawBin:Len/binary>>, false} = gen_server:call(Fd, {pread, Pos, Len}, infinity),
+    {ok, RawBin}.
+
+upgrade_old_header(Fd, Sig) ->
+    gen_server:call(Fd, {upgrade_old_header, Sig}, infinity).
+
+
 read_header(Fd) ->
     case gen_server:call(Fd, find_header, infinity) of
     {ok, Bin} ->
@@ -194,7 +213,7 @@
                     ok = file:sync(Fd),
                     couch_stats_collector:track_process_count(
                             {couchdb, open_os_files}),
-                    {ok, Fd};
+                    {ok, #file{fd=Fd}};
                 false ->
                     ok = file:close(Fd),
                     init_status_error(ReturnPid, Ref, file_exists)
@@ -202,7 +221,7 @@
             false ->
                 couch_stats_collector:track_process_count(
                         {couchdb, open_os_files}),
-                {ok, Fd}
+                {ok, #file{fd=Fd}}
             end;
         Error ->
             init_status_error(ReturnPid, Ref, Error)
@@ -214,7 +233,7 @@
             {ok, Fd} = file:open(Filepath, [read, write, raw, binary]),
             ok = file:close(Fd_Read),
             couch_stats_collector:track_process_count({couchdb, open_os_files}),
-            {ok, Fd};
+            {ok, #file{fd=Fd}};
         Error ->
             init_status_error(ReturnPid, Ref, Error)
         end
@@ -225,25 +244,26 @@
     ok.
 
 
-handle_call({pread, Pos, Bytes}, _From, Fd) ->
-    {reply, file:pread(Fd, Pos, Bytes), Fd};
-handle_call(bytes, _From, Fd) ->
-    {reply, file:position(Fd, eof), Fd};
-handle_call(sync, _From, Fd) ->
-    {reply, file:sync(Fd), Fd};
-handle_call({truncate, Pos}, _From, Fd) ->
+handle_call({pread, Pos, Bytes}, _From, #file{fd=Fd,tail_append_begin=TailAppendBegin}=File) ->
+    {ok, Bin} = file:pread(Fd, Pos, Bytes),
+    {reply, {ok, Bin, Pos > TailAppendBegin}, File};
+handle_call(bytes, _From, #file{fd=Fd}=File) ->
+    {reply, file:position(Fd, eof), File};
+handle_call(sync, _From, #file{fd=Fd}=File) ->
+    {reply, file:sync(Fd), File};
+handle_call({truncate, Pos}, _From, #file{fd=Fd}=File) ->
     {ok, Pos} = file:position(Fd, Pos),
-    {reply, file:truncate(Fd), Fd};
-handle_call({append_bin, Bin}, _From, Fd) ->
+    {reply, file:truncate(Fd), File};
+handle_call({append_bin, Bin}, _From, #file{fd=Fd}=File) ->
     {ok, Pos} = file:position(Fd, eof),
     Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
     case file:pwrite(Fd, Pos, Blocks) of
     ok ->
-        {reply, {ok, Pos}, Fd};
+        {reply, {ok, Pos}, File};
     Error ->
-        {reply, Error, Fd}
+        {reply, Error, File}
     end;
-handle_call({write_header, Bin}, _From, Fd) ->
+handle_call({write_header, Bin}, _From, #file{fd=Fd}=File) ->
     {ok, Pos} = file:position(Fd, eof),
     BinSize = size(Bin),
     case Pos rem ?SIZE_BLOCK of
@@ -253,12 +273,131 @@
         Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>>
     end,
     FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(1, [Bin])],
-    {reply, file:pwrite(Fd, Pos, FinalBin), Fd};
-handle_call(find_header, _From, Fd) ->
+    {reply, file:pwrite(Fd, Pos, FinalBin), File};
+
+
+handle_call({upgrade_old_header, Prefix}, _From, #file{fd=Fd}=File) ->
+    case (catch read_old_header(Fd, Prefix)) of
+    {ok, Header} ->
+        {ok, TailAppendBegin} = file:position(Fd, eof),
+        Bin = term_to_binary(Header),
+        Md5 = erlang:md5(Bin),
+        % now we assemble the final header binary and write to disk
+        FinalBin = <<Md5/binary, Bin/binary>>,
+        {reply, ok, _} = handle_call({write_header, FinalBin}, ok, File),
+        ok = write_old_header(Fd, <<"upgraded">>, TailAppendBegin),
+        {reply, ok, File#file{tail_append_begin=TailAppendBegin}};
+    _Error ->
+        case (catch read_old_header(Fd, <<"upgraded">>)) of
+        {ok, TailAppendBegin} ->
+            {reply, ok, File#file{tail_append_begin = TailAppendBegin}};
+        _Error2 ->
+            {reply, ok, File}
+        end
+    end;
+
+
+handle_call(find_header, _From, #file{fd=Fd}=File) ->
     {ok, Pos} = file:position(Fd, eof),
-    {reply, find_header(Fd, Pos div ?SIZE_BLOCK), Fd}.
+    {reply, find_header(Fd, Pos div ?SIZE_BLOCK), File}.
+        
+
+-define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
+
+read_old_header(Fd, Prefix) ->
+    {ok, Bin} = file:pread(Fd, 0, 2*(?HEADER_SIZE)),
+    <<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
+    Result =
+    % read the first header
+    case extract_header(Prefix, Bin1) of
+    {ok, Header1} ->
+        case extract_header(Prefix, Bin2) of
+        {ok, Header2} ->
+            case Header1 == Header2 of
+            true ->
+                % Everything is completely normal!
+                {ok, Header1};
+            false ->
+                % To get here we must have two different header versions with signatures intact.
+                % It's weird but possible (a commit failure right at the 2k boundary). Log it and take the first.
+                ?LOG_INFO("Header version differences.~nPrimary Header: ~p~nSecondary Header: ~p", [Header1, Header2]),
+                {ok, Header1}
+            end;
+        Error ->
+            % error reading second header. It's ok, but log it.
+            ?LOG_INFO("Secondary header corruption (error: ~p). Using primary header.", [Error]),
+            {ok, Header1}
+        end;
+    Error ->
+        % error reading primary header
+        case extract_header(Prefix, Bin2) of
+        {ok, Header2} ->
+            % log corrupt primary header. It's ok since the secondary is still good.
+            ?LOG_INFO("Primary header corruption (error: ~p). Using secondary header.", [Error]),
+            {ok, Header2};
+        _ ->
+            % error reading secondary header too
+            % return the error, no need to log anything as the caller will be responsible for dealing with the error.
+            Error
+        end
+    end,
+    case Result of
+    {ok, {pointer_to_header_data, Ptr}} ->
+        pread_term(Fd, Ptr);
+    _ ->
+        Result
+    end.
     
+extract_header(Prefix, Bin) ->
+    SizeOfPrefix = size(Prefix),
+    SizeOfTermBin = ?HEADER_SIZE -
+                    SizeOfPrefix -
+                    16,     % md5 sig
+
+    <<HeaderPrefix:SizeOfPrefix/binary, TermBin:SizeOfTermBin/binary, Sig:16/binary>> = Bin,
+
+    % check the header prefix
+    case HeaderPrefix of
+    Prefix ->
+        % check the integrity signature
+        case erlang:md5(TermBin) == Sig of
+        true ->
+            Header = binary_to_term(TermBin),
+            {ok, Header};
+        false ->
+            header_corrupt
+        end;
+    _ ->
+        unknown_header_type
+    end.
     
+
+
+write_old_header(Fd, Prefix, Data) ->
+    TermBin = term_to_binary(Data),
+    % the size of all the bytes written to the header, including the md5 signature (16 bytes)
+    FilledSize = size(Prefix) + size(TermBin) + 16,
+    {TermBin2, FilledSize2} =
+    case FilledSize > ?HEADER_SIZE of
+    true ->
+        % too big!
+        {ok, Pos} = append_binary(Fd, TermBin),
+        PtrBin = term_to_binary({pointer_to_header_data, Pos}),
+        {PtrBin, size(Prefix) + size(PtrBin) + 16};
+    false ->
+        {TermBin, FilledSize}
+    end,
+    ok = file:sync(Fd),
+    % pad out the header with zeros, then take the md5 hash
+    PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
+    Sig = erlang:md5([TermBin2, PadZeros]),
+    % now we assemble the final header binary and write to disk
+    WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
+    ?HEADER_SIZE = size(WriteBin), % sanity check
+    DblWriteBin = [WriteBin, WriteBin],
+    ok = file:pwrite(Fd, 0, DblWriteBin),
+    ok = file:sync(Fd).
+
     
 handle_cast(close, Fd) ->
     {stop,normal,Fd}.

Modified: couchdb/branches/tail_header/src/couchdb/couch_stream.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_stream.erl?rev=777222&r1=777221&r2=777222&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_stream.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_stream.erl Thu May 21 19:01:10 2009
@@ -13,9 +13,20 @@
 -module(couch_stream).
 -behaviour(gen_server).
 
+
+-define(FILE_POINTER_BYTES, 8).
+-define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)).
+
+-define(STREAM_OFFSET_BYTES, 4).
+-define(STREAM_OFFSET_BITS, 8*(?STREAM_OFFSET_BYTES)).
+
+-define(HUGE_CHUNK, 1000000000). % Huge chuck size when reading all in one go
+
+-define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
+
 -export([test/0]).
--export([open/1, close/1, write/2, foldl/4]).
--export([copy_to_new_stream/3]).
+-export([open/1, close/1, write/2, foldl/4, old_foldl/5,old_copy_to_new_stream/4]).
+-export([copy_to_new_stream/3,old_read_term/2]).
 -export([init/1, terminate/2, handle_call/3]).
 -export([handle_cast/2,code_change/3,handle_info/2]).
 
@@ -47,6 +58,20 @@
         end, ok),
     close(Dest).
 
+old_copy_to_new_stream(Fd, Pos, Len, DestFd) ->
+    {ok, Dest} = open(DestFd),
+    old_foldl(Fd, Pos, Len,
+        fun(Bin, _) ->
+            ok = write(Dest, Bin)
+        end, ok),
+    close(Dest).
+
+
+
+old_foldl(_Fd, null, 0, _Fun, Acc) ->
+    Acc;
+old_foldl(Fd, OldPointer, Len, Fun, Acc) when is_tuple(OldPointer)->
+    old_stream_data(Fd, OldPointer, Len, ?DEFAULT_STREAM_CHUNK, Fun, Acc).
 
 foldl(_Fd, [], _Fun, Acc) ->
     Acc;
@@ -112,6 +137,39 @@
 
 handle_info(_Info, State) ->
     {noreply, State}.
+    
+
+
+
+old_read_term(Fd, Sp) ->
+    {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
+        = old_read(Fd, Sp, ?STREAM_OFFSET_BYTES),
+    {ok, Bin, _Sp3} = old_read(Fd, Sp2, TermLen),
+    {ok, binary_to_term(Bin)}.
+
+old_read(Fd, Sp, Num) ->
+    {ok, RevBin, Sp2} = old_stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> [Bin | Acc] end, []),
+    Bin = list_to_binary(lists:reverse(RevBin)),
+    {ok, Bin, Sp2}.
+
+old_stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
+    {ok, Acc, Sp};
+old_stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
+    {ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>}
+        = couch_file:old_pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
+    Sp = {NextPos, NextOffset},
+    % Check NextPos is past current Pos (this is always true in a stream)
+    % Guards against potential infinite loops caused by corruption.
+    case NextPos > Pos of
+        true -> ok;
+        false -> throw({error, stream_corruption})
+    end,
+    old_stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
+old_stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
+    ReadAmount = lists:min([MaxChunk, Num, Offset]),
+    {ok, Bin} = couch_file:old_pread(Fd, Pos, ReadAmount),
+    Sp = {Pos + ReadAmount, Offset - ReadAmount},
+    old_stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Fun(Bin, Acc)).
 
 
 

Modified: couchdb/branches/tail_header/src/couchdb/couch_view_group.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_view_group.erl?rev=777222&r1=777221&r2=777222&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_view_group.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_view_group.erl Thu May 21 19:01:10 2009
@@ -261,7 +261,7 @@
 handle_info({'EXIT', _FromPid, normal}, State) ->
     {noreply, State};
     
-handle_info({'EXIT', FromPid, {{nocatch, Reason}, Trace}}, State) ->
+handle_info({'EXIT', FromPid, {{nocatch, Reason}, _Trace}}, State) ->
     ?LOG_DEBUG("Uncaught throw() in linked pid: ~p", [{FromPid, Reason}]),
     {stop, Reason, State};
 
@@ -313,6 +313,7 @@
             if ForceReset ->
                 {ok, reset_file(Db, Fd, DbName, Group)};
             true ->
+                ok = couch_file:upgrade_old_header(Fd, <<$r, $c, $k, 0>>),
                 case (catch couch_file:read_header(Fd)) of
                 {ok, {Sig, HeaderInfo}} ->
                     % sigs match!