You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2009/05/17 23:31:18 UTC
svn commit: r775761 - in /couchdb/branches/tail_header: ./
share/www/script/test/ src/couchdb/
Author: damien
Date: Sun May 17 21:31:17 2009
New Revision: 775761
URL: http://svn.apache.org/viewvc?rev=775761&view=rev
Log:
Tail append headers. Replication doesn't work yet, maybe needs file format upgrade code.
Added:
couchdb/branches/tail_header/
- copied from r775759, couchdb/trunk/
Modified:
couchdb/branches/tail_header/share/www/script/test/basics.js
couchdb/branches/tail_header/src/couchdb/couch_db.erl
couchdb/branches/tail_header/src/couchdb/couch_db.hrl
couchdb/branches/tail_header/src/couchdb/couch_db_updater.erl
couchdb/branches/tail_header/src/couchdb/couch_doc.erl
couchdb/branches/tail_header/src/couchdb/couch_file.erl
couchdb/branches/tail_header/src/couchdb/couch_httpd_db.erl
couchdb/branches/tail_header/src/couchdb/couch_stream.erl
couchdb/branches/tail_header/src/couchdb/couch_view_group.erl
Modified: couchdb/branches/tail_header/share/www/script/test/basics.js
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/share/www/script/test/basics.js?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/share/www/script/test/basics.js (original)
+++ couchdb/branches/tail_header/share/www/script/test/basics.js Sun May 17 21:31:17 2009
@@ -133,7 +133,7 @@
// make sure we can still open the old rev of the deleted doc
T(db.open(existingDoc._id, {rev: existingDoc._rev}) != null);
-
+ console.log("db.info: " + db.info.update_seq),
// make sure restart works
T(db.ensureFullCommit().ok);
restartServer();
Modified: couchdb/branches/tail_header/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_db.erl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_db.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_db.erl Sun May 17 21:31:17 2009
@@ -154,7 +154,7 @@
purge_docs(#db{update_pid=UpdatePid}, IdsRevs) ->
gen_server:call(UpdatePid, {purge_docs, IdsRevs}).
-get_committed_update_seq(#db{header=#db_header{update_seq=Seq}}) ->
+get_committed_update_seq(#db{committed_update_seq=Seq}) ->
Seq.
get_update_seq(#db{update_seq=Seq})->
@@ -566,92 +566,50 @@
{Fd, StreamPointer, Len};
flush_binary(Fd, {OtherFd, StreamPointer, Len}) ->
- with_stream(Fd, fun(OutputStream) ->
- % written to a different file (or a closed file
- % instance, which will cause an error)
- ok = couch_stream:set_min_buffer(OutputStream, Len),
- {ok, {NewStreamPointer, Len}, _EndSp} =
- couch_stream:foldl(OtherFd, StreamPointer, Len,
- fun(Bin, {BeginPointer, SizeAcc}) ->
- {ok, Pointer} = couch_stream:write(OutputStream, Bin),
- case SizeAcc of
- 0 -> % this was the first write, record the pointer
- {ok, {Pointer, size(Bin)}};
- _ ->
- {ok, {BeginPointer, SizeAcc + size(Bin)}}
- end
- end,
- {{0,0}, 0}),
- {Fd, NewStreamPointer, Len}
- end);
+ {NewStreamData, Len} =
+ couch_stream:copy_to_new_stream(OtherFd, StreamPointer, Fd),
+ {OtherFd, NewStreamData, Len};
flush_binary(Fd, Bin) when is_binary(Bin) ->
- with_stream(Fd, fun(OutputStream) ->
- ok = couch_stream:set_min_buffer(OutputStream, size(Bin)),
- {ok, StreamPointer} = couch_stream:write(OutputStream, Bin),
- {Fd, StreamPointer, size(Bin)}
+ with_stream(Fd, fun(OutputStream) ->
+ couch_stream:write(OutputStream, Bin)
end);
flush_binary(Fd, {StreamFun, undefined}) when is_function(StreamFun) ->
- % max_attachment_chunk_size control the max we buffer in memory
- MaxChunkSize = list_to_integer(couch_config:get("couchdb",
- "max_attachment_chunk_size","4294967296")),
with_stream(Fd, fun(OutputStream) ->
+ io:format("OutputStream:~p~n", [OutputStream]),
% StreamFun(MaxChunkSize, WriterFun) must call WriterFun
- % once for each chunk of the attachment.
- WriterFun = make_writer_fun(OutputStream),
- {ok, {TotalLength, NewStreamPointer}} =
- StreamFun(MaxChunkSize, WriterFun, {0, nil}),
- {Fd, NewStreamPointer, TotalLength}
+ % once for each chunk of the attachment,
+ StreamFun(4096,
+ % WriterFun({Length, Binary}, State)
+ % WriterFun({0, _Footers}, State)
+ % Called with Length == 0 on the last time.
+ % WriterFun returns NewState.
+ fun({0, _Footers}, _) ->
+ ok;
+ ({_Length, Bin}, _) ->
+ couch_stream:write(OutputStream, Bin)
+ end, ok)
end);
flush_binary(Fd, {Fun, Len}) when is_function(Fun) ->
with_stream(Fd, fun(OutputStream) ->
- ok = couch_stream:set_min_buffer(OutputStream, Len),
- {ok, StreamPointer} =
- write_streamed_attachment(OutputStream, Fun, Len, nil),
- {Fd, StreamPointer, Len}
+ write_streamed_attachment(OutputStream, Fun, Len)
end).
with_stream(Fd, Fun) ->
{ok, OutputStream} = couch_stream:open(Fd),
- Result = Fun(OutputStream),
- couch_stream:close(OutputStream),
- Result.
-
-make_writer_fun(Stream) ->
- % WriterFun({Length, Binary}, State)
- % WriterFun({0, _Footers}, State)
- % Called with Length == 0 on the last time.
- % WriterFun returns NewState.
- fun
- ({0, _Footers}, {FinalLen, SpFin}) ->
- % last block, return the final tuple
- {ok, {FinalLen, SpFin}};
- ({Length, Bin}, {Total, nil}) ->
- % save StreamPointer
- ok = couch_stream:set_min_buffer(Stream, Length),
- {ok, StreamPointer} = couch_stream:write(Stream, Bin),
- {Total+Length, StreamPointer};
- ({Length, Bin}, {Total, SpAcc}) ->
- % write the Bin to disk
- ok = couch_stream:set_min_buffer(Stream, Length),
- {ok, _Sp} = couch_stream:write(Stream, Bin),
- {Total+Length, SpAcc}
- end.
+ Fun(OutputStream),
+ {StreamInfo, Len} = couch_stream:close(OutputStream),
+ {Fd, StreamInfo, Len}.
+
-write_streamed_attachment(_Stream, _F, 0, SpAcc) ->
- {ok, SpAcc};
-write_streamed_attachment(Stream, F, LenLeft, nil) ->
- Bin = F(),
- TruncatedBin = check_bin_length(LenLeft, Bin),
- {ok, SpAcc} = couch_stream:write(Stream, TruncatedBin),
- write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc);
-write_streamed_attachment(Stream, F, LenLeft, SpAcc) ->
+write_streamed_attachment(_Stream, _F, 0) ->
+ ok;
+write_streamed_attachment(Stream, F, LenLeft) ->
Bin = F(),
- TruncatedBin = check_bin_length(LenLeft, Bin),
- {ok, _} = couch_stream:write(Stream, TruncatedBin),
- write_streamed_attachment(Stream, F, LenLeft - size(TruncatedBin), SpAcc).
+ ok = couch_stream:write(Stream, check_bin_length(LenLeft, Bin)),
+ write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
%% on rare occasions ibrowse seems to process a chunked response incorrectly
%% and include an extra "\r" in the last chunk. This code ensures that we
@@ -865,14 +823,13 @@
[{RevId, ?REV_MISSING, doc_to_tree_simple(Doc, Rest)}].
-make_doc(#db{fd=Fd}=Db, Id, Deleted, Bp, RevisionPath) ->
+make_doc(#db{fd=Fd}, Id, Deleted, Bp, RevisionPath) ->
{BodyData, BinValues} =
case Bp of
nil ->
{[], []};
_ ->
- {ok, {BodyData0, BinValues0}} =
- couch_stream:read_term( Db#db.summary_stream, Bp),
+ {ok, {BodyData0, BinValues0}} =couch_file:pread_term(Fd, Bp),
{BodyData0,
[{Name,{Type,{Fd,Sp,Len}}} || {Name,{Type,Sp,Len}} <- BinValues0]}
end,
Modified: couchdb/branches/tail_header/src/couchdb/couch_db.hrl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_db.hrl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_db.hrl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_db.hrl Sun May 17 21:31:17 2009
@@ -115,7 +115,7 @@
-record(db_header,
{disk_version = ?LATEST_DISK_VERSION,
update_seq = 0,
- summary_stream_state = nil,
+ unused,
fulldocinfo_by_id_btree_state = nil,
docinfo_by_seq_btree_state = nil,
local_docs_btree_state = nil,
@@ -133,7 +133,7 @@
fd,
fd_ref_counter,
header = #db_header{},
- summary_stream,
+ committed_update_seq,
fulldocinfo_by_id_btree,
docinfo_by_seq_btree,
local_docs_btree,
Modified: couchdb/branches/tail_header/src/couchdb/couch_db_updater.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_db_updater.erl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_db_updater.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_db_updater.erl Sun May 17 21:31:17 2009
@@ -19,18 +19,18 @@
-include("couch_db.hrl").
--define(HEADER_SIG, <<$g, $m, $k, 0>>).
init({MainPid, DbName, Filepath, Fd, Options}) ->
+ io:format("Init fd:~p~n", [Fd]),
case lists:member(create, Options) of
true ->
% create a new header and writes it to the file
Header = #db_header{},
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header),
+ ok = couch_file:write_header(Fd, Header),
% delete any old compaction files that might be hanging around
file:delete(Filepath ++ ".compact");
false ->
- {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG)
+ {ok, Header} = couch_file:read_header(Fd)
end,
Db = init_db(DbName, Filepath, Fd, Header),
@@ -56,8 +56,9 @@
end;
handle_call(full_commit, _From, #db{waiting_delayed_commit=nil}=Db) ->
{reply, ok, Db}; % no data waiting, return ok immediately
-handle_call(full_commit, _From, Db) ->
- {reply, ok, commit_data(Db)}; % commit the data and return ok
+handle_call(full_commit, _From, #db{fd=Fd,update_seq=Seq}=Db) ->
+ ok = couch_file:sync(Fd),
+ {reply, ok, Db#db{waiting_delayed_commit=nil,committed_update_seq=Seq}}; % commit the data and return ok
handle_call(increment_update_seq, _From, Db) ->
Db2 = commit_data(Db#db{update_seq=Db#db.update_seq+1}),
ok = gen_server:call(Db2#db.main_pid, {db_updated, Db2}),
@@ -158,7 +159,7 @@
end;
handle_cast({compact_done, CompactFilepath}, #db{filepath=Filepath}=Db) ->
{ok, NewFd} = couch_file:open(CompactFilepath),
- {ok, NewHeader} = couch_file:read_header(NewFd, ?HEADER_SIG),
+ {ok, NewHeader} = couch_file:read_header(NewFd),
#db{update_seq=NewSeq} = NewDb =
init_db(Db#db.name, Filepath, NewFd, NewHeader),
unlink(NewFd),
@@ -190,8 +191,9 @@
{noreply, Db2}
end.
-handle_info(delayed_commit, Db) ->
- {noreply, commit_data(Db#db{waiting_delayed_commit=nil})}.
+handle_info(delayed_commit, #db{update_seq=Seq}=Db) ->
+ ok = couch_file:sync(Db#db.fd),
+ {noreply, Db#db{waiting_delayed_commit=nil,committed_update_seq=Seq}}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -287,8 +289,6 @@
_ -> throw({database_disk_version_error, "Incorrect disk header version"})
end,
Header = simple_upgrade_record(Header0, #db_header{}),
- {ok, SummaryStream} = couch_stream:open(Header#db_header.summary_stream_state, Fd),
- ok = couch_stream:set_min_buffer(SummaryStream, 10000),
Less = fun less_docid/2,
{ok, IdBtree} = couch_btree:open(Header#db_header.fulldocinfo_by_id_btree_state, Fd,
@@ -319,7 +319,6 @@
fd=Fd,
fd_ref_counter = RefCntr,
header=Header,
- summary_stream = SummaryStream,
fulldocinfo_by_id_btree = IdBtree,
docinfo_by_seq_btree = SeqBtree,
local_docs_btree = LocalDocsBtree,
@@ -333,8 +332,7 @@
}.
-close_db(#db{ fd_ref_counter = RefCntr, summary_stream = SummaryStream}) ->
- couch_stream:close(SummaryStream),
+close_db(#db{fd_ref_counter = RefCntr}) ->
couch_ref_counter:drop(RefCntr).
@@ -387,7 +385,7 @@
?LOG_DEBUG("File where the attachments are written has changed. Possibly retrying.", []),
throw(retry)
end,
- {ok, NewSummaryPointer} = couch_stream:write_term(Db#db.summary_stream, {Doc#doc.body, Bins}),
+ {ok, NewSummaryPointer} = couch_file:append_term(Fd, {Doc#doc.body, Bins}),
{IsDeleted, NewSummaryPointer, UpdateSeq};
_ ->
Value
@@ -553,7 +551,6 @@
commit_data(#db{fd=Fd, header=Header} = Db, Delay) ->
Header2 = Header#db_header{
update_seq = Db#db.update_seq,
- summary_stream_state = couch_stream:get_state(Db#db.summary_stream),
docinfo_by_seq_btree_state = couch_btree:get_state(Db#db.docinfo_by_seq_btree),
fulldocinfo_by_id_btree_state = couch_btree:get_state(Db#db.fulldocinfo_by_id_btree),
local_docs_btree_state = couch_btree:get_state(Db#db.local_docs_btree),
@@ -563,53 +560,56 @@
if Header == Header2 ->
Db;
Delay and (Db#db.waiting_delayed_commit == nil) ->
+ ok = couch_file:write_header(Fd, Header2),
Db#db{waiting_delayed_commit=
erlang:send_after(1000, self(), delayed_commit)};
Delay ->
- Db;
+ ok = couch_file:write_header(Fd, Header2),
+ Db#db{header=Header2};
true ->
- if Db#db.waiting_delayed_commit /= nil ->
+ if not is_atom(Db#db.waiting_delayed_commit) ->
case erlang:cancel_timer(Db#db.waiting_delayed_commit) of
false -> receive delayed_commit -> ok after 0 -> ok end;
_ -> ok
end;
true -> ok
end,
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header2),
- Db#db{waiting_delayed_commit=nil,header=Header2}
+ ok = couch_file:write_header(Fd, Header2),
+ ok = couch_file:sync(Fd),
+ Db#db{waiting_delayed_commit=nil,header=Header2,committed_update_seq=Db#db.update_seq}
end.
-copy_raw_doc(SrcFd, SrcSp, DestFd, DestStream) ->
- {ok, {BodyData, BinInfos}} = couch_stream:read_term(SrcFd, SrcSp),
+copy_raw_doc(SrcFd, SrcSp, DestFd) ->
+ {ok, {BodyData, BinInfos}} = couch_file:pread_term(SrcFd, SrcSp),
% copy the bin values
NewBinInfos = lists:map(fun({Name, {Type, BinSp, Len}}) ->
- {ok, NewBinSp} = couch_stream:copy_to_new_stream(SrcFd, BinSp, Len, DestFd),
+ {NewBinSp, Len} = couch_stream:copy_to_new_stream(SrcFd, BinSp, DestFd),
{Name, {Type, NewBinSp, Len}}
end, BinInfos),
% now write the document summary
- {ok, Sp} = couch_stream:write_term(DestStream, {BodyData, NewBinInfos}),
+ {ok, Sp} = couch_file:append_term(DestFd, {BodyData, NewBinInfos}),
Sp.
-copy_rev_tree(_SrcFd, _DestFd, _DestStream, []) ->
+copy_rev_tree(_SrcFd, _DestFd, []) ->
[];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{Start, Tree} | RestTree]) ->
+copy_rev_tree(SrcFd, DestFd, [{Start, Tree} | RestTree]) ->
% root nner node, only copy info/data from leaf nodes
- [Tree2] = copy_rev_tree(SrcFd, DestFd, DestStream, [Tree]),
- [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
+ [Tree2] = copy_rev_tree(SrcFd, DestFd, [Tree]),
+ [{Start, Tree2} | copy_rev_tree(SrcFd, DestFd, RestTree)];
+copy_rev_tree(SrcFd, DestFd, [{RevId, {IsDel, Sp, Seq}, []} | RestTree]) ->
% This is a leaf node, copy it over
- NewSp = copy_raw_doc(SrcFd, Sp, DestFd, DestStream),
- [{RevId, {IsDel, NewSp, Seq}, []} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)];
-copy_rev_tree(SrcFd, DestFd, DestStream, [{RevId, _, SubTree} | RestTree]) ->
+ NewSp = copy_raw_doc(SrcFd, Sp, DestFd),
+ [{RevId, {IsDel, NewSp, Seq}, []} | copy_rev_tree(SrcFd, DestFd, RestTree)];
+copy_rev_tree(SrcFd, DestFd, [{RevId, _, SubTree} | RestTree]) ->
% inner node, only copy info/data from leaf nodes
- [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, DestStream, SubTree)} | copy_rev_tree(SrcFd, DestFd, DestStream, RestTree)].
+ [{RevId, ?REV_MISSING, copy_rev_tree(SrcFd, DestFd, SubTree)} | copy_rev_tree(SrcFd, DestFd, RestTree)].
-copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd,summary_stream=DestStream}=NewDb, InfoBySeq, Retry) ->
+copy_docs(#db{fd=SrcFd}=Db, #db{fd=DestFd}=NewDb, InfoBySeq, Retry) ->
Ids = [Id || #doc_info{id=Id} <- InfoBySeq],
LookupResults = couch_btree:lookup(Db#db.fulldocinfo_by_id_btree, Ids),
NewFullDocInfos0 = lists:map(
fun({ok, #full_doc_info{rev_tree=RevTree}=Info}) ->
- Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, DestStream, RevTree)}
+ Info#full_doc_info{rev_tree=copy_rev_tree(SrcFd, DestFd, RevTree)}
end, LookupResults),
NewFullDocInfos = stem_full_doc_infos(Db, NewFullDocInfos0),
NewDocInfos = [couch_doc:to_doc_info(Info) || Info <- NewFullDocInfos],
@@ -642,7 +642,7 @@
if TotalCopied rem 1000 == 0 ->
NewDb2 = copy_docs(Db, AccNewDb, lists:reverse([DocInfo | AccUncopied]), Retry),
if TotalCopied rem 10000 == 0 ->
- {ok, {commit_data(NewDb2#db{update_seq=Seq}), [], TotalCopied + 1}};
+ {ok, {commit_data(NewDb2#db{update_seq=Seq}, true), [], TotalCopied + 1}};
true ->
{ok, {NewDb2#db{update_seq=Seq}, [], TotalCopied + 1}}
end;
@@ -668,7 +668,7 @@
NewDb4 = NewDb3
end,
- commit_data(NewDb4#db{update_seq=Db#db.update_seq}).
+ commit_data(NewDb4#db{update_seq=Db#db.update_seq}, true).
start_copy_compact(#db{name=Name,filepath=Filepath}=Db) ->
CompactFile = Filepath ++ ".compact",
@@ -677,16 +677,16 @@
{ok, Fd} ->
couch_task_status:add_task(<<"Database Compaction">>, <<Name/binary, " retry">>, <<"Starting">>),
Retry = true,
- {ok, Header} = couch_file:read_header(Fd, ?HEADER_SIG);
+ {ok, Header} = couch_file:read_header(Fd);
{error, enoent} ->
couch_task_status:add_task(<<"Database Compaction">>, Name, <<"Starting">>),
{ok, Fd} = couch_file:open(CompactFile, [create]),
Retry = false,
- ok = couch_file:write_header(Fd, ?HEADER_SIG, Header=#db_header{})
+ ok = couch_file:write_header(Fd, Header=#db_header{})
end,
NewDb = init_db(Name, CompactFile, Fd, Header),
unlink(Fd),
- NewDb2 = copy_compact(Db, NewDb, Retry),
+ NewDb2 = copy_compact(Db, NewDb#db{waiting_delayed_commit=never}, Retry),
gen_server:cast(Db#db.update_pid, {compact_done, CompactFile}),
close_db(NewDb2).
Modified: couchdb/branches/tail_header/src/couchdb/couch_doc.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_doc.erl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_doc.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_doc.erl Sun May 17 21:31:17 2009
@@ -250,13 +250,9 @@
bin_foldl(Bin, Fun, Acc) when is_binary(Bin) ->
- case Fun(Bin, Acc) of
- {ok, Acc2} -> {ok, Acc2};
- {done, Acc2} -> {ok, Acc2}
- end;
-bin_foldl({Fd, Sp, Len}, Fun, Acc) ->
- {ok, Acc2, _Sp2} = couch_stream:foldl(Fd, Sp, Len, Fun, Acc),
- {ok, Acc2}.
+ Fun(Bin, Acc);
+bin_foldl({Fd, Sp, _Len}, Fun, Acc) ->
+ couch_stream:foldl(Fd, Sp, Fun, Acc).
bin_size(Bin) when is_binary(Bin) ->
size(Bin);
@@ -265,9 +261,8 @@
bin_to_binary(Bin) when is_binary(Bin) ->
Bin;
-bin_to_binary({Fd, Sp, Len}) ->
- {ok, Bin, _Sp2} = couch_stream:read(Fd, Sp, Len),
- Bin.
+bin_to_binary({Fd, Sp, _Len}) ->
+ couch_stream:foldl(Fd, Sp, fun(Bin, Acc) -> [Bin|Acc] end, []).
get_validate_doc_fun(#doc{body={Props}}) ->
Lang = proplists:get_value(<<"language">>, Props, <<"javascript">>),
Modified: couchdb/branches/tail_header/src/couchdb/couch_file.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_file.erl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_file.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_file.erl Sun May 17 21:31:17 2009
@@ -15,10 +15,11 @@
-include("couch_db.hrl").
--define(HEADER_SIZE, 2048). % size of each segment of the doubly written header
+-define(SIZE_BLOCK, 4096).
--export([open/1, open/2, close/1, pread/3, pwrite/3, expand/2, bytes/1, sync/1]).
--export([append_term/2, pread_term/2,write_header/3, read_header/2, truncate/2]).
+-export([open/1, open/2, close/1, bytes/1, sync/1, append_binary/2]).
+-export([append_term/2, pread_term/2, pread_iolist/2, write_header/2]).
+-export([pread_binary/2, read_header/1, truncate/2]).
-export([init/1, terminate/2, handle_call/3, handle_cast/2, code_change/3, handle_info/2]).
%%----------------------------------------------------------------------
@@ -52,39 +53,6 @@
%%----------------------------------------------------------------------
-%% Args: Pos is the offset from the beginning of the file, Bytes is
-%% is the number of bytes to read.
-%% Returns: {ok, Binary} where Binary is a binary data from disk
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-pread(Fd, Pos, Bytes) when Bytes > 0 ->
- gen_server:call(Fd, {pread, Pos, Bytes}, infinity).
-
-
-%%----------------------------------------------------------------------
-%% Args: Pos is the offset from the beginning of the file, Bin is
-%% is the binary to write
-%% Returns: ok
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-pwrite(Fd, Pos, Bin) ->
- gen_server:call(Fd, {pwrite, Pos, Bin}, infinity).
-
-%%----------------------------------------------------------------------
-%% Purpose: To append a segment of zeros to the end of the file.
-%% Args: Bytes is the number of bytes to append to the file.
-%% Returns: {ok, Pos} where Pos is the file offset to the beginning of
-%% the new segments.
-%% or {error, Reason}.
-%%----------------------------------------------------------------------
-
-expand(Fd, Bytes) when Bytes > 0 ->
- gen_server:call(Fd, {expand, Bytes}, infinity).
-
-
-%%----------------------------------------------------------------------
%% Purpose: To append an Erlang term to the end of the file.
%% Args: Erlang term to serialize and append to the file.
%% Returns: {ok, Pos} where Pos is the file offset to the beginning the
@@ -105,7 +73,9 @@
%%----------------------------------------------------------------------
append_binary(Fd, Bin) ->
- gen_server:call(Fd, {append_bin, Bin}, infinity).
+ Size = iolist_size(Bin),
+ SizePrependedBin = iolist_to_binary([<<Size:32/integer>>, Bin]),
+ gen_server:call(Fd, {append_bin, SizePrependedBin}, infinity).
%%----------------------------------------------------------------------
@@ -119,6 +89,7 @@
{ok, Bin} = pread_binary(Fd, Pos),
{ok, binary_to_term(Bin)}.
+
%%----------------------------------------------------------------------
%% Purpose: Reads a binrary from a file that was written with append_binary
%% Args: Pos, the offset into the file where the term is serialized.
@@ -127,8 +98,22 @@
%%----------------------------------------------------------------------
pread_binary(Fd, Pos) ->
- gen_server:call(Fd, {pread_bin, Pos}, infinity).
+ {ok, L} = pread_iolist(Fd, Pos),
+ {ok, iolist_to_binary(L)}.
+pread_iolist(Fd, Pos) ->
+ {ok, LenIolist, NextPos} =read_raw_iolist(Fd, Pos, 4),
+ <<Len:32/integer>> = iolist_to_binary(LenIolist),
+ {ok, Iolist, _} = read_raw_iolist(Fd, NextPos, Len),
+ {ok, Iolist}.
+
+read_raw_iolist(Fd, Pos, Len) when (Pos rem ?SIZE_BLOCK) == 0 ->
+ read_raw_iolist(Fd, Pos + 1, Len);
+read_raw_iolist(Fd, Pos, Len) ->
+ BlockOffset = Pos rem ?SIZE_BLOCK,
+ TotalBytes = calculate_total_read_len(BlockOffset, Len),
+ {ok, <<RawBin:TotalBytes/binary>>} = gen_server:call(Fd, {pread, Pos, TotalBytes}, infinity),
+ {ok, remove_block_prefixes(BlockOffset, RawBin), Pos + TotalBytes}.
%%----------------------------------------------------------------------
%% Purpose: The length of a file, in bytes.
@@ -167,99 +152,22 @@
catch unlink(Fd),
Result.
-
-write_header(Fd, Prefix, Data) ->
- TermBin = term_to_binary(Data),
- % the size of all the bytes written to the header, including the md5 signature (16 bytes)
- FilledSize = size(Prefix) + size(TermBin) + 16,
- {TermBin2, FilledSize2} =
- case FilledSize > ?HEADER_SIZE of
- true ->
- % too big!
- {ok, Pos} = append_binary(Fd, TermBin),
- PtrBin = term_to_binary({pointer_to_header_data, Pos}),
- {PtrBin, size(Prefix) + size(PtrBin) + 16};
- false ->
- {TermBin, FilledSize}
- end,
- ok = sync(Fd),
- % pad out the header with zeros, then take the md5 hash
- PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
- Sig = erlang:md5([TermBin2, PadZeros]),
- % now we assemble the final header binary and write to disk
- WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
- ?HEADER_SIZE = size(WriteBin), % sanity check
- DblWriteBin = [WriteBin, WriteBin],
- ok = pwrite(Fd, 0, DblWriteBin),
- ok = sync(Fd).
-
-
-read_header(Fd, Prefix) ->
- {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)),
- <<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
- Result =
- % read the first header
- case extract_header(Prefix, Bin1) of
- {ok, Header1} ->
- case extract_header(Prefix, Bin2) of
- {ok, Header2} ->
- case Header1 == Header2 of
- true ->
- % Everything is completely normal!
- {ok, Header1};
- false ->
- % To get here we must have two different header versions with signatures intact.
- % It's weird but possible (a commit failure right at the 2k boundary). Log it and take the first.
- ?LOG_INFO("Header version differences.~nPrimary Header: ~p~nSecondary Header: ~p", [Header1, Header2]),
- {ok, Header1}
- end;
- Error ->
- % error reading second header. It's ok, but log it.
- ?LOG_INFO("Secondary header corruption (error: ~p). Using primary header.", [Error]),
- {ok, Header1}
- end;
- Error ->
- % error reading primary header
- case extract_header(Prefix, Bin2) of
- {ok, Header2} ->
- % log corrupt primary header. It's ok since the secondary is still good.
- ?LOG_INFO("Primary header corruption (error: ~p). Using secondary header.", [Error]),
- {ok, Header2};
- _ ->
- % error reading secondary header too
- % return the error, no need to log anything as the caller will be responsible for dealing with the error.
- Error
- end
- end,
- case Result of
- {ok, {pointer_to_header_data, Ptr}} ->
- pread_term(Fd, Ptr);
- _ ->
- Result
+read_header(Fd) ->
+ case gen_server:call(Fd, find_header, infinity) of
+ {ok, Bin} ->
+ {ok, binary_to_term(Bin)};
+ Else ->
+ Else
end.
-extract_header(Prefix, Bin) ->
- SizeOfPrefix = size(Prefix),
- SizeOfTermBin = ?HEADER_SIZE -
- SizeOfPrefix -
- 16, % md5 sig
-
- <<HeaderPrefix:SizeOfPrefix/binary, TermBin:SizeOfTermBin/binary, Sig:16/binary>> = Bin,
-
- % check the header prefix
- case HeaderPrefix of
- Prefix ->
- % check the integrity signature
- case erlang:md5(TermBin) == Sig of
- true ->
- Header = binary_to_term(TermBin),
- {ok, Header};
- false ->
- header_corrupt
- end;
- _ ->
- unknown_header_type
- end.
+write_header(Fd, Data) ->
+ Bin = term_to_binary(Data),
+ Md5 = erlang:md5(Bin),
+ % now we assemble the final header binary and write to disk
+ FinalBin = <<Md5/binary, Bin/binary>>,
+ gen_server:call(Fd, {write_header, FinalBin}, infinity).
+
+
init_status_error(ReturnPid, Ref, Error) ->
@@ -319,11 +227,6 @@
handle_call({pread, Pos, Bytes}, _From, Fd) ->
{reply, file:pread(Fd, Pos, Bytes), Fd};
-handle_call({pwrite, Pos, Bin}, _From, Fd) ->
- {reply, file:pwrite(Fd, Pos, Bin), Fd};
-handle_call({expand, Num}, _From, Fd) ->
- {ok, Pos} = file:position(Fd, eof),
- {reply, {file:pwrite(Fd, Pos + Num - 1, <<0>>), Pos}, Fd};
handle_call(bytes, _From, Fd) ->
{reply, file:position(Fd, eof), Fd};
handle_call(sync, _From, Fd) ->
@@ -332,16 +235,33 @@
{ok, Pos} = file:position(Fd, Pos),
{reply, file:truncate(Fd), Fd};
handle_call({append_bin, Bin}, _From, Fd) ->
- Len = size(Bin),
- Bin2 = <<Len:32, Bin/binary>>,
{ok, Pos} = file:position(Fd, eof),
- {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd};
-handle_call({pread_bin, Pos}, _From, Fd) ->
- {ok, <<TermLen:32>>} = file:pread(Fd, Pos, 4),
- {ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
- {reply, {ok, Bin}, Fd}.
-
-
+ Blocks = make_blocks(Pos rem ?SIZE_BLOCK, Bin),
+ case file:pwrite(Fd, Pos, Blocks) of
+ ok ->
+ {reply, {ok, Pos}, Fd};
+ Error ->
+ {reply, Error, Fd}
+ end;
+handle_call({write_header, Bin}, _From, Fd) ->
+ {ok, Pos} = file:position(Fd, eof),
+ BinSize = size(Bin),
+ case Pos rem ?SIZE_BLOCK of
+ 0 ->
+ io:format("Writing header at block:~p~n", [(Pos div ?SIZE_BLOCK)]),
+ Padding = <<>>;
+ BlockOffset ->
+ io:format("Writing header at block:~p~n", [(Pos div ?SIZE_BLOCK) + 1]),
+ Padding = <<0:(8*(?SIZE_BLOCK-BlockOffset))>>
+ end,
+ FinalBin = [Padding, <<1, BinSize:32/integer>> | make_blocks(1, Bin)],
+ {reply, file:pwrite(Fd, Pos, FinalBin), Fd};
+handle_call(find_header, _From, Fd) ->
+ {ok, Pos} = file:position(Fd, eof),
+ {reply, find_header(Fd, Pos div ?SIZE_BLOCK), Fd}.
+
+
+
handle_cast(close, Fd) ->
{stop,normal,Fd}.
@@ -351,3 +271,64 @@
handle_info({'EXIT', _, Reason}, Fd) ->
{stop, Reason, Fd}.
+
+
+find_header(_Fd, -1) ->
+ no_valid_header;
+find_header(Fd, Block) ->
+ case (catch load_header(Fd, Block)) of
+ {ok, Bin} ->
+ io:format("Found header at block:~p~n", [Block]),
+ {ok, Bin};
+ _Error ->
+ find_header(Fd, Block -1)
+ end.
+
+load_header(Fd, Block) ->
+ {ok, <<1>>} = file:pread(Fd, Block*?SIZE_BLOCK, 1),
+ {ok, <<HeaderLen:32/integer>>} = file:pread(Fd, (Block*?SIZE_BLOCK) + 1, 4),
+ TotalBytes = calculate_total_read_len(1, HeaderLen),
+ {ok, <<RawBin:TotalBytes/binary>>} =
+ file:pread(Fd, (Block*?SIZE_BLOCK) + 5, TotalBytes),
+ io:format("Foo:~p~n", [RawBin]),
+ <<Md5Sig:16/binary, HeaderBin/binary>> =
+ iolist_to_binary(remove_block_prefixes(1, RawBin)),
+ Md5Sig = erlang:md5(HeaderBin),
+ {ok, HeaderBin}.
+
+
+calculate_total_read_len(BlockOffset, FinalLen) ->
+ case ?SIZE_BLOCK - BlockOffset of
+ BlockLeft when BlockLeft >= FinalLen ->
+ FinalLen;
+ BlockLeft ->
+ FinalLen + ((FinalLen - BlockLeft) div (?SIZE_BLOCK -1)) +
+ if ((FinalLen - BlockLeft) rem (?SIZE_BLOCK -1)) == 0 -> 0;
+ true -> 1 end
+ end.
+
+remove_block_prefixes(_BlockOffset, <<>>) ->
+ [];
+remove_block_prefixes(0, <<_BlockPrefix,Rest/binary>>) ->
+ remove_block_prefixes(1, Rest);
+remove_block_prefixes(BlockOffset, Bin) ->
+ BlockBytesAvailable = ?SIZE_BLOCK - BlockOffset,
+ case size(Bin) of
+ Size when Size > BlockBytesAvailable ->
+ <<DataBlock:BlockBytesAvailable/binary,Rest/binary>> = Bin,
+ [DataBlock | remove_block_prefixes(0, Rest)];
+ _Size ->
+ [Bin]
+ end.
+
+make_blocks(_BlockOffset, <<>>) ->
+ [];
+make_blocks(0, Bin) ->
+ [<<0>> | make_blocks(1, Bin)];
+make_blocks(BlockOffset, Bin) when size(Bin) =< (?SIZE_BLOCK - BlockOffset) ->
+ [Bin];
+make_blocks(BlockOffset, Bin) ->
+ BlockBytes = (?SIZE_BLOCK - BlockOffset),
+ <<BlockBin:BlockBytes/binary, Rest/binary>> = Bin,
+ [BlockBin | make_blocks(0, Rest)].
+
Modified: couchdb/branches/tail_header/src/couchdb/couch_httpd_db.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_httpd_db.erl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_httpd_db.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_httpd_db.erl Sun May 17 21:31:17 2009
@@ -720,12 +720,7 @@
% {"Content-Length", integer_to_list(couch_doc:bin_size(Bin))}
]),
couch_doc:bin_foldl(Bin,
- fun(BinSegment, []) ->
- send_chunk(Resp, BinSegment),
- {ok, []}
- end,
- []
- ),
+ fun(BinSegment, _) -> send_chunk(Resp, BinSegment) end,[]),
send_chunk(Resp, "")
end;
Modified: couchdb/branches/tail_header/src/couchdb/couch_stream.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_stream.erl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_stream.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_stream.erl Sun May 17 21:31:17 2009
@@ -13,144 +13,96 @@
-module(couch_stream).
-behaviour(gen_server).
--export([test/1]).
--export([open/1, open/2, close/1, read/3, read_term/2, write/2, write_term/2, get_state/1, foldl/5]).
--export([copy/4, copy_to_new_stream/4]).
--export([ensure_buffer/2, set_min_buffer/2]).
+-export([test/0]).
+-export([open/1, close/1, write/2, foldl/4]).
+-export([copy_to_new_stream/3]).
-export([init/1, terminate/2, handle_call/3]).
-export([handle_cast/2,code_change/3,handle_info/2]).
-include("couch_db.hrl").
--define(FILE_POINTER_BYTES, 8).
--define(FILE_POINTER_BITS, 8*(?FILE_POINTER_BYTES)).
-
--define(STREAM_OFFSET_BYTES, 4).
--define(STREAM_OFFSET_BITS, 8*(?STREAM_OFFSET_BYTES)).
-
--define(HUGE_CHUNK, 1000000000). % Huge chuck size when reading all in one go
-
--define(DEFAULT_STREAM_CHUNK, 16#00100000). % 1 meg chunks when streaming data
-
-
--record(write_stream,
- {fd = 0,
- current_pos = 0,
- bytes_remaining = 0,
- next_alloc = 0,
- min_alloc = 16#00010000
- }).
-
-record(stream,
- {
- pid,
- fd
+ {fd = 0,
+ written_pointers=[],
+ buffer_list = [],
+ buffer_len = 0,
+ max_buffer = 4096,
+ written_len = 0
}).
%%% Interface functions %%%
open(Fd) ->
- open(nil, Fd).
+ gen_server:start_link(couch_stream, Fd, []).
-open(nil, Fd) ->
- open({0,0}, Fd);
-open(State, Fd) ->
- {ok, Pid} = gen_server:start_link(couch_stream, {State, Fd}, []),
- {ok, #stream{pid = Pid, fd = Fd}}.
-
-close(#stream{pid = Pid, fd = _Fd}) ->
+close(Pid) ->
gen_server:call(Pid, close, infinity).
-get_state(#stream{pid = Pid, fd = _Fd}) ->
- gen_server:call(Pid, get_state, infinity).
-
-ensure_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
- gen_server:call(Pid, {ensure_buffer, Bytes}).
+copy_to_new_stream(Fd, PosList, DestFd) ->
+ {ok, Dest} = open(DestFd),
+ foldl(Fd, PosList,
+ fun(Bin, _) ->
+ ok = write(Dest, Bin)
+ end, ok),
+ close(Dest).
-set_min_buffer(#stream{pid = Pid, fd = _Fd}, Bytes) ->
- gen_server:call(Pid, {set_min_buffer, Bytes}).
-read(#stream{pid = _Pid, fd = Fd}, Sp, Num) ->
- read(Fd, Sp, Num);
-read(Fd, Sp, Num) ->
- {ok, RevBin, Sp2} = stream_data(Fd, Sp, Num, ?HUGE_CHUNK, fun(Bin, Acc) -> {ok, [Bin | Acc]} end, []),
- Bin = list_to_binary(lists:reverse(RevBin)),
- {ok, Bin, Sp2}.
+foldl(_Fd, [], _Fun, Acc) ->
+ Acc;
+foldl(Fd, [Pos|Rest], Fun, Acc) ->
+ {ok, Bin} = couch_file:pread_iolist(Fd, Pos),
+ foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
-copy_to_new_stream(Src, Sp, Len, DestFd) ->
- {ok, Dest} = open(DestFd),
- ok = set_min_buffer(Dest, 0),
- {ok, NewSp} = copy(Src, Sp, Len, Dest),
- close(Dest),
- {ok, NewSp}.
-
-copy(#stream{pid = _Pid, fd = Fd}, Sp, Len, DestStream) ->
- copy(Fd, Sp, Len, DestStream);
-copy(Fd, Sp, Len, DestStream) ->
- {ok, NewSp, _Sp2} = stream_data(Fd, Sp, Len, ?HUGE_CHUNK,
- fun(Bin, AccPointer) ->
- {ok, NewPointer} = write(DestStream, Bin),
- {ok, if AccPointer == null -> NewPointer; true -> AccPointer end}
- end,
- null),
- {ok, NewSp}.
-
-foldl(#stream{pid = _Pid, fd = Fd}, Sp, Num, Fun, Acc) ->
- foldl(Fd, Sp, Num, Fun, Acc);
-foldl(Fd, Sp, Num, Fun, Acc) ->
- {ok, _Acc, _Sp} = stream_data(Fd, Sp, Num, ?DEFAULT_STREAM_CHUNK, Fun, Acc).
-
-read_term(#stream{pid = _Pid, fd = Fd}, Sp) ->
- read_term(Fd, Sp);
-read_term(Fd, Sp) ->
- {ok, <<TermLen:(?STREAM_OFFSET_BITS)>>, Sp2}
- = read(Fd, Sp, ?STREAM_OFFSET_BYTES),
- {ok, Bin, _Sp3} = read(Fd, Sp2, TermLen),
- {ok, binary_to_term(Bin)}.
-
-write_term(Stream, Term) ->
- Bin = term_to_binary(Term),
- Size = size(Bin),
- Bin2 = <<Size:(?STREAM_OFFSET_BITS), Bin/binary>>,
- write(Stream, Bin2).
-
-write(#stream{}, <<>>) ->
- {ok, {0,0}};
-write(#stream{pid = Pid}, Bin) when is_binary(Bin) ->
+write(_Pid, <<>>) ->
+ ok;
+write(Pid, Bin) ->
gen_server:call(Pid, {write, Bin}, infinity).
-init({{Pos, BytesRemaining}, Fd}) ->
- {ok, #write_stream
- {fd = Fd,
- current_pos = Pos,
- bytes_remaining = BytesRemaining
- }}.
+init(Fd) ->
+ {ok, #stream{fd = Fd}}.
terminate(_Reason, _Stream) ->
ok.
-handle_call(get_state, _From, Stream) ->
- #write_stream{current_pos = Pos, bytes_remaining = BytesRemaining} = Stream,
- {reply, {Pos, BytesRemaining}, Stream};
-handle_call({set_min_buffer, MinBuffer}, _From, Stream) ->
- {reply, ok, Stream#write_stream{min_alloc = MinBuffer}};
-% set next_alloc if we need more room
-handle_call({ensure_buffer, BufferSizeRequested}, _From, Stream) ->
- #write_stream{bytes_remaining = BytesRemainingInCurrentBuffer} = Stream,
- case BytesRemainingInCurrentBuffer < BufferSizeRequested of
- true -> NextAlloc = BufferSizeRequested - BytesRemainingInCurrentBuffer;
- false -> NextAlloc = 0 % enough room in current segment
- end,
- {reply, ok, Stream#write_stream{next_alloc = NextAlloc}};
handle_call({write, Bin}, _From, Stream) ->
- % ensure init is called first so we can get a pointer to the begining of the binary
- {ok, Sp, Stream2} = write_data(Stream, Bin),
- {reply, {ok, Sp}, Stream2};
+ BinSize = iolist_size(Bin),
+ #stream{
+ fd = Fd,
+ written_len = WrittenLen,
+ written_pointers = Written,
+ buffer_len = BufferLen,
+ buffer_list = Buffer,
+ max_buffer = Max} = Stream,
+ if BinSize + BufferLen > Max ->
+ {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer, Bin)),
+ {reply, ok, Stream#stream{
+ written_len=WrittenLen + BufferLen + BinSize,
+ written_pointers=[Pos|Written],
+ buffer_list=[],
+ buffer_len=0}};
+ true ->
+ {reply, ok, Stream#stream{
+ buffer_list=[Bin|Buffer],
+ buffer_len=BufferLen + BinSize}}
+ end;
handle_call(close, _From, Stream) ->
- #write_stream{current_pos=Pos, bytes_remaining = BytesRemaining} = Stream,
- {stop, normal, {ok, {Pos, BytesRemaining}}, Stream}.
+ #stream{
+ fd = Fd,
+ written_len = WrittenLen,
+ written_pointers = Written,
+ buffer_len = BufferLen,
+ buffer_list = Buffer} = Stream,
+
+ case Buffer of
+ [] ->
+ Result = {Written, WrittenLen};
+ _ ->
+ {ok, Pos} = couch_file:append_binary(Fd, lists:reverse(Buffer)),
+ Result = {[Pos|Written], WrittenLen + BufferLen}
+ end,
+ {stop, normal, Result, Stream}.
handle_cast(_Msg, State) ->
{noreply,State}.
@@ -161,100 +113,40 @@
handle_info(_Info, State) ->
{noreply, State}.
-%%% Internal function %%%
-
-stream_data(_Fd, Sp, 0, _MaxChunk, _Fun, Acc) ->
- {ok, Acc, Sp};
-stream_data(Fd, {Pos, 0}, Num, MaxChunk, Fun, Acc) ->
- {ok, <<NextPos:(?FILE_POINTER_BITS), NextOffset:(?STREAM_OFFSET_BITS)>>}
- = couch_file:pread(Fd, Pos, ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
- Sp = {NextPos, NextOffset},
- % Check NextPos is past current Pos (this is always true in a stream)
- % Guards against potential infinite loops caused by corruption.
- case NextPos > Pos of
- true -> ok;
- false -> throw({error, stream_corruption})
- end,
- stream_data(Fd, Sp, Num, MaxChunk, Fun, Acc);
-stream_data(Fd, {Pos, Offset}, Num, MaxChunk, Fun, Acc) ->
- ReadAmount = lists:min([MaxChunk, Num, Offset]),
- {ok, Bin} = couch_file:pread(Fd, Pos, ReadAmount),
- Sp = {Pos + ReadAmount, Offset - ReadAmount},
- case Fun(Bin, Acc) of
- {ok, Acc2} ->
- stream_data(Fd, Sp, Num - ReadAmount, MaxChunk, Fun, Acc2);
- {stop, Acc2} ->
- {ok, Acc2, Sp}
- end.
-
-write_data(Stream, <<>>) ->
- {ok, {0,0}, Stream};
-write_data(#write_stream{bytes_remaining=0} = Stream, Bin) ->
- #write_stream {
- fd = Fd,
- current_pos = CurrentPos,
- next_alloc = NextAlloc,
- min_alloc = MinAlloc
- }= Stream,
-
- NewSize = lists:max([MinAlloc, NextAlloc, size(Bin)]),
- % no space in the current segment, must alloc a new segment
- {ok, NewPos} = couch_file:expand(Fd, NewSize + ?FILE_POINTER_BYTES + ?STREAM_OFFSET_BYTES),
-
- case CurrentPos of
- 0 ->
- ok;
- _ ->
- ok = couch_file:pwrite(Fd, CurrentPos, <<NewPos:(?FILE_POINTER_BITS), NewSize:(?STREAM_OFFSET_BITS)>>)
- end,
- Stream2 = Stream#write_stream{
- current_pos=NewPos,
- bytes_remaining=NewSize,
- next_alloc=0},
- write_data(Stream2, Bin);
-write_data(#write_stream{fd=Fd, current_pos=Pos, bytes_remaining=BytesRemaining} = Stream, Bin) ->
- BytesToWrite = lists:min([size(Bin), BytesRemaining]),
- {WriteBin, Rest} = split_binary(Bin, BytesToWrite),
- ok = couch_file:pwrite(Fd, Pos, WriteBin),
- Stream2 = Stream#write_stream{
- bytes_remaining=BytesRemaining - BytesToWrite,
- current_pos=Pos + BytesToWrite
- },
- {ok, _, Stream3} = write_data(Stream2, Rest),
- {ok, {Pos, BytesRemaining}, Stream3}.
-
%%% Tests %%%
-
-test(Term) ->
- {ok, Fd} = couch_file:open("foo", [write]),
- {ok, Stream} = open({0,0}, Fd),
- {ok, Pos} = write_term(Stream, Term),
- {ok, Pos2} = write_term(Stream, {Term, Term}),
- close(Stream),
+read_all(Fd, PosList) ->
+ iolist_to_binary(foldl(Fd, PosList,
+ fun(Bin, Acc) ->
+ [Bin, Acc]
+ end, [])).
+
+
+test() ->
+ {ok, Fd} = couch_file:open("foo", [create,overwrite]),
+ ok = couch_file:write_header(Fd, {howdy, howdy}),
+ Bin = <<"damienkatz">>,
+ {ok, Pos} = couch_file:append_binary(Fd, Bin),
+ {ok, Bin} = couch_file:pread_binary(Fd, Pos),
+ {ok, {howdy, howdy}} = couch_file:read_header(Fd),
+ ok = couch_file:write_header(Fd, {foo, foo}),
+ {ok, {foo, foo}} = couch_file:read_header(Fd),
+
+ {ok, Stream} = open(Fd),
+ ok = write(Stream, <<"food">>),
+ ok = write(Stream, <<"foob">>),
+ {PosList, 8} = close(Stream),
+ <<"foodfoob">> = read_all(Fd, PosList),
+ {ok, Stream2} = open(Fd),
+ OneBits = <<1:(8*10)>>,
+ ZeroBits = <<0:(8*10)>>,
+ ok = write(Stream2, OneBits),
+ ok = write(Stream2, ZeroBits),
+ {PosList2, 20} = close(Stream2),
+ AllBits = iolist_to_binary([OneBits,ZeroBits]),
+ AllBits = read_all(Fd, PosList2),
couch_file:close(Fd),
- {ok, Fd2} = couch_file:open("foo", [read, write]),
- {ok, Stream2} = open({0,0}, Fd2),
- {ok, Term1} = read_term(Fd2, Pos),
- io:format("Term1: ~w ~n",[Term1]),
- {ok, Term2} = read_term(Fd2, Pos2),
- io:format("Term2: ~w ~n",[Term2]),
- {ok, PointerList} = deep_write_test(Stream2, Term, 1000, []),
- deep_read_test(Fd2, PointerList),
- close(Stream2),
- couch_file:close(Fd2).
+ PosList2.
-deep_read_test(_Fd, []) ->
- ok;
-deep_read_test(Fd, [Pointer | RestPointerList]) ->
- {ok, _Term} = read_term(Fd, Pointer),
- deep_read_test(Fd, RestPointerList).
-
-deep_write_test(_Stream, _Term, 0, PointerList) ->
- {ok, PointerList};
-deep_write_test(Stream, Term, N, PointerList) ->
- WriteList = lists:duplicate(random:uniform(N), Term),
- {ok, Pointer} = write_term(Stream, WriteList),
- deep_write_test(Stream, Term, N-1, [Pointer | PointerList]).
Modified: couchdb/branches/tail_header/src/couchdb/couch_view_group.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/tail_header/src/couchdb/couch_view_group.erl?rev=775761&r1=775759&r2=775761&view=diff
==============================================================================
--- couchdb/branches/tail_header/src/couchdb/couch_view_group.erl (original)
+++ couchdb/branches/tail_header/src/couchdb/couch_view_group.erl Sun May 17 21:31:17 2009
@@ -205,7 +205,7 @@
if CommittedSeq >= Group#group.current_seq ->
% save the header
Header = {Group#group.sig, get_index_header_data(Group)},
- ok = couch_file:write_header(Group#group.fd, <<$r, $c, $k, 0>>, Header),
+ ok = couch_file:write_header(Group#group.fd, Header),
{noreply, State#group_state{waiting_commit=false}};
true ->
% We can't commit the header because the database seq that's fully
@@ -313,7 +313,7 @@
if ForceReset ->
{ok, reset_file(Db, Fd, DbName, Group)};
true ->
- case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
+ case (catch couch_file:read_header(Fd)) of
{ok, {Sig, HeaderInfo}} ->
% sigs match!
{ok, init_group(Db, Fd, Group, HeaderInfo)};
@@ -417,7 +417,7 @@
reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
ok = couch_file:truncate(Fd, 0),
- ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
+ ok = couch_file:write_header(Fd, {Sig, nil}),
init_group(Db, Fd, reset_group(Group), nil).
delete_index_file(RootDir, DbName, GroupId) ->