You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2008/06/12 23:32:20 UTC

svn commit: r667236 - in /incubator/couchdb/trunk/src/couchdb: couch_file.erl couch_view.erl

Author: damien
Date: Thu Jun 12 14:32:20 2008
New Revision: 667236

URL: http://svn.apache.org/viewvc?rev=667236&view=rev
Log:
fix for problem when view index header data exceeds 2k.

Modified:
    incubator/couchdb/trunk/src/couchdb/couch_file.erl
    incubator/couchdb/trunk/src/couchdb/couch_view.erl

Modified: incubator/couchdb/trunk/src/couchdb/couch_file.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_file.erl?rev=667236&r1=667235&r2=667236&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_file.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_file.erl Thu Jun 12 14:32:20 2008
@@ -93,7 +93,19 @@
 %%----------------------------------------------------------------------
 
 append_term(Fd, Term) ->
-    gen_server:call(Fd, {append_term, Term}).
+    append_binary(Fd, term_to_binary(Term, [compressed])).
+
+
+%%----------------------------------------------------------------------
+%% Purpose: To append an Erlang binary to the end of the file.
+%% Args:    Erlang term to serialize and append to the file.
+%% Returns: {ok, Pos} where Pos is the file offset to the beginning the
+%%  serialized  term. Use pread_term to read the term back.
+%%  or {error, Reason}.
+%%----------------------------------------------------------------------
+    
+append_binary(Fd, Bin) ->
+    gen_server:call(Fd, {append_bin, Bin}).
 
 
 %%----------------------------------------------------------------------
@@ -104,7 +116,18 @@
 %%----------------------------------------------------------------------
 
 pread_term(Fd, Pos) ->
-    gen_server:call(Fd, {pread_term, Pos}).
+    {ok, Bin} = pread_binary(Fd, Pos),
+    {ok, binary_to_term(Bin)}.
+
+%%----------------------------------------------------------------------
+%% Purpose: Reads a binrary from a file that was written with append_binary
+%% Args:    Pos, the offset into the file where the term is serialized.
+%% Returns: {ok, Term}
+%%  or {error, Reason}.
+%%----------------------------------------------------------------------
+
+pread_binary(Fd, Pos) ->
+    gen_server:call(Fd, {pread_bin, Pos}).
 
 
 %%----------------------------------------------------------------------
@@ -144,30 +167,35 @@
 
 
 write_header(Fd, Prefix, Data) ->
-    ok = sync(Fd),
     TermBin = term_to_binary(Data),
     % the size of all the bytes written to the header, including the md5 signature (16 bytes)
     FilledSize = size(Prefix) + size(TermBin) + 16,
+    {TermBin2, FilledSize2} =
     case FilledSize > ?HEADER_SIZE of
     true ->
         % too big!
-        {error, error_header_too_large};
+        {ok, Pos} = append_binary(Fd, TermBin),
+        PtrBin = term_to_binary({pointer_to_header_data, Pos}),
+        {PtrBin, size(Prefix) + size(PtrBin) + 16};
     false ->
-        % pad out the header with zeros, then take the md5 hash
-        PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize))>>,
-        Sig = erlang:md5([TermBin, PadZeros]),
-        % now we assemble the final header binary and write to disk
-        WriteBin = <<Prefix/binary, TermBin/binary, PadZeros/binary, Sig/binary>>,
-        ?HEADER_SIZE = size(WriteBin), % sanity check
-        DblWriteBin = [WriteBin, WriteBin],
-        ok = pwrite(Fd, 0, DblWriteBin),
-        ok = sync(Fd)
-    end.
+        {TermBin, FilledSize}
+    end,
+    ok = sync(Fd),
+    % pad out the header with zeros, then take the md5 hash
+    PadZeros = <<0:(8*(?HEADER_SIZE - FilledSize2))>>,
+    Sig = erlang:md5([TermBin2, PadZeros]),
+    % now we assemble the final header binary and write to disk
+    WriteBin = <<Prefix/binary, TermBin2/binary, PadZeros/binary, Sig/binary>>,
+    ?HEADER_SIZE = size(WriteBin), % sanity check
+    DblWriteBin = [WriteBin, WriteBin],
+    ok = pwrite(Fd, 0, DblWriteBin),
+    ok = sync(Fd).
 
 
 read_header(Fd, Prefix) ->
     {ok, Bin} = couch_file:pread(Fd, 0, 2*(?HEADER_SIZE)),
     <<Bin1:(?HEADER_SIZE)/binary, Bin2:(?HEADER_SIZE)/binary>> = Bin,
+    Result =
     % read the first header
     case extract_header(Prefix, Bin1) of
     {ok, Header1} ->
@@ -200,9 +228,14 @@
             % return the error, no need to log anything as the caller will be responsible for dealing with the error.
             {error, Error}
         end
+    end,
+    case Result of
+    {ok, {pointer_to_header_data, Ptr}} ->
+        pread_term(Fd, Ptr);
+    _ ->
+        Result
     end.
-
-
+    
 extract_header(Prefix, Bin) ->
     SizeOfPrefix = size(Prefix),
     SizeOfTermBin = ?HEADER_SIZE -
@@ -300,17 +333,16 @@
 handle_call({truncate, Pos}, _From, Fd) ->
     {ok, Pos} = file:position(Fd, Pos),
     {reply, file:truncate(Fd), Fd};
-handle_call({append_term, Term}, _From, Fd) ->
-    Bin = term_to_binary(Term, [compressed]),
-    TermLen = size(Bin),
-    Bin2 = <<TermLen:32, Bin/binary>>,
+handle_call({append_bin, Bin}, _From, Fd) ->
+    Len = size(Bin),
+    Bin2 = <<Len:32, Bin/binary>>,
     {ok, Pos} = file:position(Fd, eof),
     {reply, {file:pwrite(Fd, Pos, Bin2), Pos}, Fd};
-handle_call({pread_term, Pos}, _From, Fd) ->
+handle_call({pread_bin, Pos}, _From, Fd) ->
     {ok, <<TermLen:32>>}
         = file:pread(Fd, Pos, 4),
     {ok, Bin} = file:pread(Fd, Pos + 4, TermLen),
-    {reply, {ok, binary_to_term(Bin)}, Fd}.
+    {reply, {ok, Bin}, Fd}.
 
 
 handle_cast(close, Fd) ->

Modified: incubator/couchdb/trunk/src/couchdb/couch_view.erl
URL: http://svn.apache.org/viewvc/incubator/couchdb/trunk/src/couchdb/couch_view.erl?rev=667236&r1=667235&r2=667236&view=diff
==============================================================================
--- incubator/couchdb/trunk/src/couchdb/couch_view.erl (original)
+++ incubator/couchdb/trunk/src/couchdb/couch_view.erl Thu Jun 12 14:32:20 2008
@@ -22,12 +22,12 @@
 -include("couch_db.hrl").
 
 -record(group,
-    {db,
-    fd,
+    {sig=nil,
+    db=nil,
+    fd=nil,
     name,
     def_lang,
     views,
-    reductions=[], % list of reduction names and id_num of view that contains it.
     id_btree=nil,
     current_seq=0,
     query_server=nil
@@ -189,7 +189,8 @@
             {View#view{id_num=N},N+1}
         end, 0, dict:to_list(DictBySrc)),
     
-    reset_group(#group{name=Id, views=Views, def_lang=Language}).
+    Group = #group{name=Id, views=Views, def_lang=Language},
+    Group#group{sig=erlang:md5(term_to_binary(Group))}.
     
 
 
@@ -331,7 +332,7 @@
             current_seq=0,
             def_lang=Lang,
             id_btree=nil},
-        Group2 = disk_group_to_mem(Db, Fd, Group),
+        Group2 = init_group(Db, Fd, Group,nil),
         temp_update_loop(Group2, NotifyPids);
     Else ->
         exit(Else)
@@ -355,7 +356,7 @@
     start_update_loop(RootDir, DbName, GroupId, get_notify_pids(1000)).
     
 start_update_loop(RootDir, DbName, GroupId, NotifyPids) ->
-    {Db, DbGroup} =
+    {Db, Group} =
     case (catch couch_server:open(DbName)) of
     {ok, Db0} ->
         case (catch couch_db:open_doc(Db0, GroupId)) of
@@ -370,38 +371,37 @@
  	    exit(Else)
  	end,
  	FileName = RootDir ++ "/." ++ DbName ++ GroupId ++".view",
- 	Group =
+ 	Group2 =
     case couch_file:open(FileName) of
     {ok, Fd} ->
+        Sig = Group#group.sig,
         case (catch couch_file:read_header(Fd, <<$r, $c, $k, 0>>)) of
-        {ok, ExistingDiskGroup} ->
-            % validate all the view definitions in the index are correct.
-            case reset_group(ExistingDiskGroup) == reset_group(DbGroup) of
-            true  -> disk_group_to_mem(Db, Fd, ExistingDiskGroup);
-            false -> reset_file(Db, Fd, DbName, DbGroup)
-            end;
+        {ok, {Sig, HeaderInfo}} ->
+            % sigs match!
+            init_group(Db, Fd, Group, HeaderInfo);
         _ ->
-            reset_file(Db, Fd, DbName, DbGroup)
+            reset_file(Db, Fd, DbName, Group)
         end;
     {error, enoent} ->
         case couch_file:open(FileName, [create]) of
-        {ok, Fd} -> reset_file(Db, Fd, DbName, DbGroup);
+        {ok, Fd} -> reset_file(Db, Fd, DbName, Group);
         Error    -> throw(Error)
         end
     end,
     
-    update_loop(RootDir, DbName, GroupId, Group, NotifyPids).
+    update_loop(RootDir, DbName, GroupId, Group2, NotifyPids).
 
-reset_file(Db, Fd, DbName, #group{name=Name} = DiskReadyGroup) ->
+reset_file(Db, Fd, DbName, #group{sig=Sig,name=Name} = Group) ->
     ?LOG_DEBUG("Reseting group index \"~s\" in db ~s", [Name, DbName]),
     ok = couch_file:truncate(Fd, 0),
-    ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, DiskReadyGroup),
-    disk_group_to_mem(Db, Fd, DiskReadyGroup).
+    ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, {Sig, nil}),
+    init_group(Db, Fd, reset_group(Group), nil).
 
-update_loop(RootDir, DbName, GroupId, #group{fd=Fd}=Group, NotifyPids) ->
+update_loop(RootDir, DbName, GroupId, #group{sig=Sig,fd=Fd}=Group, NotifyPids) ->
     try update_group(Group) of
     {ok, Group2} ->    
-        ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, mem_group_to_disk(Group2)),
+        HeaderData = {Sig, get_index_header_data(Group2)},
+        ok = couch_file:write_header(Fd, <<$r, $c, $k, 0>>, HeaderData),
         [Pid ! {self(), {ok, Group2}} || Pid <- NotifyPids],
         garbage_collect(),
         update_loop(RootDir, DbName, GroupId, Group2, get_notify_pids(100000))
@@ -475,11 +475,13 @@
 delete_index_file(RootDir, DbName, GroupId) ->
     file:delete(RootDir ++ "/." ++ DbName ++ GroupId ++ ".view").
 
-% Given a disk ready group structure, return an initialized, in-memory version.
-disk_group_to_mem(Db, Fd, #group{id_btree=IdState,def_lang=Lang,views=Views}=Group) ->
-    {ok, IdBtree} = couch_btree:open(IdState, Fd),
-    Views2 = lists:map(
-        fun(#view{btree=BtreeState,reduce_funs=RedFuns}=View) ->
+init_group(Db, Fd, #group{views=Views}=Group, nil = _IndexHeaderData) ->
+    init_group(Db, Fd, Group, {0, nil, [nil || _ <- Views]});
+init_group(Db, Fd, #group{def_lang=Lang,views=Views}=Group,
+        {Seq, IdBtreeState, ViewStates} = _IndexHeaderData) ->
+    {ok, IdBtree} = couch_btree:open(IdBtreeState, Fd),
+    Views2 = lists:zipwith(
+        fun(BtreeState, #view{btree=BtreeState,reduce_funs=RedFuns}=View) ->
             FunSrcs = [FunSrc || {_Name, FunSrc} <- RedFuns],
             ReduceFun = 
                 fun(reduce, KVs) ->
@@ -495,18 +497,13 @@
                         [{less, fun less_json/2},{reduce, ReduceFun}]),
             View#view{btree=Btree}
         end,
-        Views),
-    Group#group{db=Db, fd=Fd, id_btree=IdBtree, views=Views2}.
-    
-% Given an initialized, in-memory group structure, return a disk ready version.
-mem_group_to_disk(#group{id_btree=IdBtree,views=Views}=Group) ->
-    Views2 = lists:map(
-        fun(#view{btree=Btree}=View) ->
-            State = couch_btree:get_state(Btree),
-            View#view{btree=State}
-        end,
-        Views),
-    Group#group{db=nil, fd=nil, id_btree=couch_btree:get_state(IdBtree), views=Views2}.
+        ViewStates, Views),
+    Group#group{db=Db, fd=Fd, current_seq=Seq, id_btree=IdBtree, views=Views2}.
+
+
+get_index_header_data(#group{current_seq=Seq,id_btree=IdBtree,views=Views}) ->
+    ViewStates = [couch_btree:get_state(Btree) || #view{btree=Btree} <- Views],
+    {Seq, couch_btree:get_state(IdBtree), ViewStates}.
 
 
 
@@ -582,7 +579,7 @@
         end
     end.
 
-process_doc(Db, DocInfo, {Docs, #group{name=GroupId}=Group, ViewKVs, DocIdViewIdKeys, _LastSeq}) ->
+process_doc(Db, DocInfo, {Docs, #group{sig=Sig,name=GroupId}=Group, ViewKVs, DocIdViewIdKeys, _LastSeq}) ->
     % This fun computes once for each document
     #doc_info{id=DocId, update_seq=Seq, deleted=Deleted} = DocInfo,
     case DocId of
@@ -591,11 +588,11 @@
         % anything in the definition changed.
         case couch_db:open_doc(Db, DocInfo) of
         {ok, Doc} ->
-            case design_doc_to_view_group(Doc) == reset_group(Group) of
-            true ->
-                % nothing changed, keeping on computing
+            case design_doc_to_view_group(Doc) of
+            #group{sig=Sig} ->
+                % The same md5 signature, keep on computing
                 {ok, {Docs, Group, ViewKVs, DocIdViewIdKeys, Seq}};
-            false ->
+            _ ->
                 throw(restart)
             end;
         {not_found, deleted} ->