You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ni...@apache.org on 2014/02/25 21:43:01 UTC

[2/3] couchdb commit: updated refs/heads/1956-attachment-handling to 9bda4f6

Allow chunked transfer for attachments, and make length optional

Use couch_httpd:recv_chunked to read request data. Spawn a new
process for it, to force waiting between chunks.

Change couch_db:flush_att/2 to write until a body_end token when using
recv_chunked, making attachment length unnecessary.

Simplify couch_db:write_streamed_attachment/3 to remove read_next_chunk
as function arity cannot now be zero.

Tweaks to couch_doc to cope with body_end token.


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/f632df7a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/f632df7a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/f632df7a

Branch: refs/heads/1956-attachment-handling
Commit: f632df7a80fe0bf496b80b82e31b73039856d17d
Parents: 5898925
Author: NickNorth <No...@gmail.com>
Authored: Tue Feb 25 20:28:32 2014 +0000
Committer: NickNorth <No...@gmail.com>
Committed: Tue Feb 25 20:40:10 2014 +0000

----------------------------------------------------------------------
 src/couchdb/couch_db.erl       | 24 ++++++++++++++++--------
 src/couchdb/couch_doc.erl      |  5 ++++-
 src/couchdb/couch_httpd_db.erl | 29 +++++++++++++++++++++--------
 3 files changed, 41 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/f632df7a/src/couchdb/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 11ea0fd..3bc30ca 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -927,7 +927,12 @@ flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) ->
         couch_stream:write(OutputStream, Data)
     end);
 
-flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun) ->
+flush_att(Fd, #att{data=Fun}=Att) when is_function(Fun, 0) ->
+    with_stream(Fd, Att, fun(OutputStream) ->
+        write_to_body_end(OutputStream, Fun)
+    end);
+
+flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun, 3) ->
     MaxChunkSize = list_to_integer(
         couch_config:get("couchdb", "attachment_stream_buffer_size", "4096")),
     with_stream(Fd, Att, fun(OutputStream) ->
@@ -951,7 +956,7 @@ flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun) ->
             end, ok)
     end);
 
-flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) ->
+flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun, 1) ->
     with_stream(Fd, Att, fun(OutputStream) ->
         write_streamed_attachment(OutputStream, Fun, AttLen)
     end).
@@ -1038,19 +1043,22 @@ with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) ->
         encoding=NewEnc
     }.
 
+write_to_body_end(Stream, F) ->
+    case F() of
+        {body_end, _} ->
+            ok;
+        {bytes, Bin} ->
+            ok = couch_stream:write(Stream, Bin),
+            write_to_body_end(Stream, F)
+    end.
 
 write_streamed_attachment(_Stream, _F, 0) ->
     ok;
 write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 ->
-    Bin = read_next_chunk(F, LenLeft),
+    Bin = F(lists:min([LenLeft, 16#2000])),
     ok = couch_stream:write(Stream, Bin),
     write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
 
-read_next_chunk(F, _) when is_function(F, 0) ->
-    F();
-read_next_chunk(F, LenLeft) when is_function(F, 1) ->
-    F(lists:min([LenLeft, 16#2000])).
-
 enum_docs_since_reduce_to_count(Reds) ->
     couch_btree:final_reduce(
             fun couch_db_updater:btree_by_seq_reduce/2, Reds).

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f632df7a/src/couchdb/couch_doc.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl
index 4047370..234f494 100644
--- a/src/couchdb/couch_doc.erl
+++ b/src/couchdb/couch_doc.erl
@@ -581,7 +581,7 @@ doc_from_multi_part_stream(ContentType, DataFun) ->
         % replace with function that reads the data from MIME stream.
         ReadAttachmentDataFun = fun() ->
             Parser ! {get_bytes, Ref, self()},
-            receive {bytes, Ref, Bytes} -> Bytes end
+            receive {Kind, Ref, Bytes} -> {Kind, Bytes} end
         end,
         Atts2 = lists:map(
             fun(#att{data=follows}=A) ->
@@ -623,6 +623,9 @@ mp_parse_atts({body, Bytes}) ->
     end,
     fun mp_parse_atts/1;
 mp_parse_atts(body_end) ->
+    receive {get_bytes, Ref, From} ->
+        From ! {body_end, Ref, <<>>}
+    end,
     fun mp_parse_atts/1.
 
 

http://git-wip-us.apache.org/repos/asf/couchdb/blob/f632df7a/src/couchdb/couch_httpd_db.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 6a78b13..6067d74 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -699,14 +699,27 @@ send_ranges_multipart(Req, ContentType, Len, Att, Ranges) ->
     {ok, Resp}.
 
 receive_request_data(Req) ->
-    receive_request_data(Req, couch_httpd:body_length(Req)).
-
-receive_request_data(Req, LenLeft) when LenLeft > 0 ->
-    Len = erlang:min(4096, LenLeft),
-    Data = couch_httpd:recv(Req, Len),
-    {Data, fun() -> receive_request_data(Req, LenLeft - iolist_size(Data)) end};
-receive_request_data(_Req, _) ->
-    throw(<<"expected more data">>).
+    Parent = self(),
+    Ref = make_ref(),
+    Receiver = spawn_link(fun() ->
+        couch_httpd:recv_chunked(Req, 4096, fun
+            ({_Len, Data}, _State) ->
+                Parent ! {chunked_bytes, Ref, Data},
+                receive
+                ok ->
+                    null
+                end
+            end, null),
+        unlink(Parent)
+    end),
+    receive_stream_data(Receiver, Ref).
+
+receive_stream_data(Receiver, Ref) ->
+    receive
+    {chunked_bytes, Ref, Data} ->
+        Receiver ! ok,
+        {Data, fun() -> receive_stream_data(Receiver, Ref) end}
+    end.
 
 make_content_range(From, To, Len) ->
     io_lib:format("bytes ~B-~B/~B", [From, To, Len]).