You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by jc...@apache.org on 2009/02/14 01:17:45 UTC

svn commit: r744309 - in /couchdb/trunk/src: couchdb/couch_db.erl couchdb/couch_httpd.erl mochiweb/mochiweb_request.erl

Author: jchris
Date: Sat Feb 14 00:17:45 2009
New Revision: 744309

URL: http://svn.apache.org/viewvc?rev=744309&view=rev
Log:
enhance the Mochiweb streaming api based on feedback from Bob Ippolito

Modified:
    couchdb/trunk/src/couchdb/couch_db.erl
    couchdb/trunk/src/couchdb/couch_httpd.erl
    couchdb/trunk/src/mochiweb/mochiweb_request.erl

Modified: couchdb/trunk/src/couchdb/couch_db.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_db.erl?rev=744309&r1=744308&r2=744309&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_db.erl (original)
+++ couchdb/trunk/src/couchdb/couch_db.erl Sat Feb 14 00:17:45 2009
@@ -441,8 +441,7 @@
                 {ok, StreamPointer} = couch_stream:write(OutputStream, Bin),
                 {Fd, StreamPointer, size(Bin)};
             {StreamFun, undefined} when is_function(StreamFun) ->
-                % we will throw an error if the client 
-                % sends a chunk larger than this size
+                % max_attachment_chunk_size control the max we buffer in memory
                 MaxChunkSize = list_to_integer(couch_config:get("couchdb", 
                     "max_attachment_chunk_size","4294967296")),
                 WriterFun = make_writer_fun(OutputStream),

Modified: couchdb/trunk/src/couchdb/couch_httpd.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd.erl?rev=744309&r1=744308&r2=744309&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd.erl Sat Feb 14 00:17:45 2009
@@ -264,7 +264,7 @@
     % Fun is called once with each chunk
     % Fun({Length, Binary}, State)
     % called with Length == 0 on the last time.
-    MochiReq:recv_body(MaxChunkSize, ChunkFun, InitState).
+    MochiReq:stream_body(MaxChunkSize, ChunkFun, InitState).
 
 body(#httpd{mochi_req=MochiReq}) ->
     % Maximum size of document PUT request body (4GB)

Modified: couchdb/trunk/src/mochiweb/mochiweb_request.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/mochiweb/mochiweb_request.erl?rev=744309&r1=744308&r2=744309&view=diff
==============================================================================
--- couchdb/trunk/src/mochiweb/mochiweb_request.erl (original)
+++ couchdb/trunk/src/mochiweb/mochiweb_request.erl Sat Feb 14 00:17:45 2009
@@ -12,7 +12,7 @@
 -define(READ_SIZE, 8192).
 
 -export([get_header_value/1, get_primary_header_value/1, get/1, dump/0]).
--export([send/1, recv/1, recv/2, recv_body/0, recv_body/1, recv_body/3]).
+-export([send/1, recv/1, recv/2, recv_body/0, recv_body/1, stream_body/3]).
 -export([start_response/1, start_response_length/1, start_raw_response/1]).
 -export([respond/1, ok/1]).
 -export([not_found/0, not_found/1]).
@@ -171,39 +171,54 @@
 %% @doc Receive the body of the HTTP request (defined by Content-Length).
 %%      Will receive up to MaxBody bytes.
 recv_body(MaxBody) ->
-    recv_body(MaxBody, nil, nil).
+    % we could use a sane constant for max chunk size
+    Body = stream_body(?MAX_RECV_BODY, fun
+        ({0, _ChunkedFooter}, {_LengthAcc, BinAcc}) -> 
+            iolist_to_binary(lists:reverse(BinAcc));
+        ({Length, Bin}, {LengthAcc, BinAcc}) ->
+            NewLength = Length + LengthAcc,
+            if NewLength > MaxBody ->
+                exit({body_too_large, chunked});
+            true -> 
+                {NewLength, [Bin | BinAcc]}
+            end
+        end, {0, []}, ?MAX_RECV_BODY),
+    put(?SAVE_BODY, Body),
+    Body.
+
+stream_body(MaxChunkSize, ChunkFun, FunState) ->
+    stream_body(MaxChunkSize, ChunkFun, FunState, undefined).
+    
+stream_body(MaxChunkSize, ChunkFun, FunState, MaxBodyLength) ->
 
-recv_body(MaxBody, ChunkFun, ChunkAcc) ->
     case get_header_value("expect") of
         "100-continue" ->
             start_raw_response({100, gb_trees:empty()});
         _Else ->
             ok
     end,
-    Body = case body_length() of
-               undefined ->
-                   undefined;
-               {unknown_transfer_encoding, Unknown} ->
-                   exit({unknown_transfer_encoding, Unknown});
-               chunked ->
-                   case ChunkFun of
-                        nil ->
-                            read_chunked_body(MaxBody);
-                        _StreamFun ->
-                            % In this case the MaxBody is actually used to
-                            % determine the maximum allowed size of a single
-                            % chunk.
-                            stream_chunked_body(MaxBody, ChunkFun, ChunkAcc)
-                    end;
-               0 ->
-                   <<>>;
-               Length when is_integer(Length), Length =< MaxBody ->
-                   recv(Length);
-               Length ->
-                   exit({body_too_large, Length})
-           end,
-    put(?SAVE_BODY, Body),
-    Body.
+    case body_length() of
+        undefined ->
+            undefined;
+        {unknown_transfer_encoding, Unknown} ->
+            exit({unknown_transfer_encoding, Unknown});
+        chunked ->
+            % In this case the MaxBody is actually used to
+            % determine the maximum allowed size of a single
+            % chunk.
+            stream_chunked_body(MaxChunkSize, ChunkFun, FunState);
+        0 ->
+            <<>>;
+        Length when is_integer(Length) ->
+            case MaxBodyLength of
+            MaxBodyLength when is_integer(MaxBodyLength), MaxBodyLength < Length ->
+                exit({body_too_large, content_length});
+            _ ->
+                stream_unchunked_body(Length, MaxChunkSize, ChunkFun, FunState)
+            end;     
+        Length ->
+            exit({length_not_integer, Length})
+    end.
 
 
 %% @spec start_response({integer(), ioheaders()}) -> response()
@@ -419,14 +434,6 @@
             Cached
     end.
 
-read_chunked_body(MaxBufferSize) ->
-    stream_chunked_body(MaxBufferSize, fun
-        ({0, _}, Acc) ->
-            iolist_to_binary(lists:reverse(Acc));
-        ({_Length, Bin}, Acc) ->
-            [Bin | Acc]
-    end, []).
-
 %% @spec stream_chunked_body(integer(), fun(), term()) -> term()
 %% @doc The function is called for each chunk.
 %%      Used internally by read_chunked_body.
@@ -435,12 +442,25 @@
         0 ->
             Fun({0, read_chunk(0)}, FunState);
         Length when Length > MaxChunkSize ->
-            exit({body_too_large, chunked});
+            NewState = read_sub_chunks(Length, MaxChunkSize, Fun, FunState),
+            stream_chunked_body(MaxChunkSize, Fun, NewState);
         Length ->
             NewState = Fun({Length, read_chunk(Length)}, FunState),
             stream_chunked_body(MaxChunkSize, Fun, NewState)
     end.
 
+stream_unchunked_body(0, _MaxChunkSize, Fun, FunState) ->
+    Fun({0, <<>>}, FunState);
+stream_unchunked_body(Length, MaxChunkSize, Fun, FunState) when Length > MaxChunkSize ->
+    Bin = recv(MaxChunkSize),
+    NewState = Fun({MaxChunkSize, Bin}, FunState),
+    stream_unchunked_body(Length - MaxChunkSize, MaxChunkSize, Fun, NewState);
+stream_unchunked_body(Length, MaxChunkSize, Fun, FunState) ->
+    Bin = recv(Length),
+    NewState = Fun({Length, Bin}, FunState),
+    stream_unchunked_body(0, MaxChunkSize, Fun, NewState).
+
+
 %% @spec read_chunk_length() -> integer()
 %% @doc Read the length of the next HTTP chunk.
 read_chunk_length() ->
@@ -483,6 +503,14 @@
             exit(normal)
     end.
 
+read_sub_chunks(Length, MaxChunkSize, Fun, FunState) when Length > MaxChunkSize ->
+    Bin = recv(MaxChunkSize),
+    NewState = Fun({size(Bin), Bin}, FunState),
+    read_sub_chunks(Length - MaxChunkSize, MaxChunkSize, Fun, NewState);
+
+read_sub_chunks(Length, _MaxChunkSize, Fun, FunState) ->
+    Fun({Length, read_chunk(Length)}, FunState).
+    
 %% @spec serve_file(Path, DocRoot) -> Response
 %% @doc Serve a file relative to DocRoot.
 serve_file(Path, DocRoot) ->