You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ni...@apache.org on 2014/02/25 21:43:01 UTC
[2/3] couchdb commit: updated refs/heads/1956-attachment-handling to
9bda4f6
Allow chunked transfer for attachments, and make length optional
Use couch_httpd:recv_chunked to read request data. Spawn a new
process for it, to force waiting between chunks.
Change couch_db:flush_att/2 to write until a body_end token when using
recv_chunked, making attachment length unnecessary.
Simplify couch_db:write_streamed_attachment/3 to remove read_next_chunk
as function arity cannot now be zero.
Tweaks to couch_doc to cope with body_end token.
Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/f632df7a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/f632df7a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/f632df7a
Branch: refs/heads/1956-attachment-handling
Commit: f632df7a80fe0bf496b80b82e31b73039856d17d
Parents: 5898925
Author: NickNorth <No...@gmail.com>
Authored: Tue Feb 25 20:28:32 2014 +0000
Committer: NickNorth <No...@gmail.com>
Committed: Tue Feb 25 20:40:10 2014 +0000
----------------------------------------------------------------------
src/couchdb/couch_db.erl | 24 ++++++++++++++++--------
src/couchdb/couch_doc.erl | 5 ++++-
src/couchdb/couch_httpd_db.erl | 29 +++++++++++++++++++++--------
3 files changed, 41 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb/blob/f632df7a/src/couchdb/couch_db.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 11ea0fd..3bc30ca 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -927,7 +927,12 @@ flush_att(Fd, #att{data=Data}=Att) when is_binary(Data) ->
couch_stream:write(OutputStream, Data)
end);
-flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun) ->
+flush_att(Fd, #att{data=Fun}=Att) when is_function(Fun, 0) ->
+ with_stream(Fd, Att, fun(OutputStream) ->
+ write_to_body_end(OutputStream, Fun)
+ end);
+
+flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun, 3) ->
MaxChunkSize = list_to_integer(
couch_config:get("couchdb", "attachment_stream_buffer_size", "4096")),
with_stream(Fd, Att, fun(OutputStream) ->
@@ -951,7 +956,7 @@ flush_att(Fd, #att{data=Fun,att_len=undefined}=Att) when is_function(Fun) ->
end, ok)
end);
-flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun) ->
+flush_att(Fd, #att{data=Fun,att_len=AttLen}=Att) when is_function(Fun, 1) ->
with_stream(Fd, Att, fun(OutputStream) ->
write_streamed_attachment(OutputStream, Fun, AttLen)
end).
@@ -1038,19 +1043,22 @@ with_stream(Fd, #att{md5=InMd5,type=Type,encoding=Enc}=Att, Fun) ->
encoding=NewEnc
}.
+write_to_body_end(Stream, F) ->
+ case F() of
+ {body_end, _} ->
+ ok;
+ {bytes, Bin} ->
+ ok = couch_stream:write(Stream, Bin),
+ write_to_body_end(Stream, F)
+ end.
write_streamed_attachment(_Stream, _F, 0) ->
ok;
write_streamed_attachment(Stream, F, LenLeft) when LenLeft > 0 ->
- Bin = read_next_chunk(F, LenLeft),
+ Bin = F(lists:min([LenLeft, 16#2000])),
ok = couch_stream:write(Stream, Bin),
write_streamed_attachment(Stream, F, LenLeft - size(Bin)).
-read_next_chunk(F, _) when is_function(F, 0) ->
- F();
-read_next_chunk(F, LenLeft) when is_function(F, 1) ->
- F(lists:min([LenLeft, 16#2000])).
-
enum_docs_since_reduce_to_count(Reds) ->
couch_btree:final_reduce(
fun couch_db_updater:btree_by_seq_reduce/2, Reds).
http://git-wip-us.apache.org/repos/asf/couchdb/blob/f632df7a/src/couchdb/couch_doc.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_doc.erl b/src/couchdb/couch_doc.erl
index 4047370..234f494 100644
--- a/src/couchdb/couch_doc.erl
+++ b/src/couchdb/couch_doc.erl
@@ -581,7 +581,7 @@ doc_from_multi_part_stream(ContentType, DataFun) ->
% replace with function that reads the data from MIME stream.
ReadAttachmentDataFun = fun() ->
Parser ! {get_bytes, Ref, self()},
- receive {bytes, Ref, Bytes} -> Bytes end
+ receive {Kind, Ref, Bytes} -> {Kind, Bytes} end
end,
Atts2 = lists:map(
fun(#att{data=follows}=A) ->
@@ -623,6 +623,9 @@ mp_parse_atts({body, Bytes}) ->
end,
fun mp_parse_atts/1;
mp_parse_atts(body_end) ->
+ receive {get_bytes, Ref, From} ->
+ From ! {body_end, Ref, <<>>}
+ end,
fun mp_parse_atts/1.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/f632df7a/src/couchdb/couch_httpd_db.erl
----------------------------------------------------------------------
diff --git a/src/couchdb/couch_httpd_db.erl b/src/couchdb/couch_httpd_db.erl
index 6a78b13..6067d74 100644
--- a/src/couchdb/couch_httpd_db.erl
+++ b/src/couchdb/couch_httpd_db.erl
@@ -699,14 +699,27 @@ send_ranges_multipart(Req, ContentType, Len, Att, Ranges) ->
{ok, Resp}.
receive_request_data(Req) ->
- receive_request_data(Req, couch_httpd:body_length(Req)).
-
-receive_request_data(Req, LenLeft) when LenLeft > 0 ->
- Len = erlang:min(4096, LenLeft),
- Data = couch_httpd:recv(Req, Len),
- {Data, fun() -> receive_request_data(Req, LenLeft - iolist_size(Data)) end};
-receive_request_data(_Req, _) ->
- throw(<<"expected more data">>).
+ Parent = self(),
+ Ref = make_ref(),
+ Receiver = spawn_link(fun() ->
+ couch_httpd:recv_chunked(Req, 4096, fun
+ ({_Len, Data}, _State) ->
+ Parent ! {chunked_bytes, Ref, Data},
+ receive
+ ok ->
+ null
+ end
+ end, null),
+ unlink(Parent)
+ end),
+ receive_stream_data(Receiver, Ref).
+
+receive_stream_data(Receiver, Ref) ->
+ receive
+ {chunked_bytes, Ref, Data} ->
+ Receiver ! ok,
+ {Data, fun() -> receive_stream_data(Receiver, Ref) end}
+ end.
make_content_range(From, To, Len) ->
io_lib:format("bytes ~B-~B/~B", [From, To, Len]).