You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/10/05 11:54:17 UTC
svn commit: r1004596 - in /couchdb/branches/new_replicator/src/couchdb:
couch_api_wrap.erl couch_api_wrap_httpc.erl
Author: fdmanana
Date: Tue Oct 5 09:54:17 2010
New Revision: 1004596
URL: http://svn.apache.org/viewvc?rev=1004596&view=rev
Log:
New replicator: do streaming when sending documents through the bulk docs API.
Modified:
couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl
Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=1004596&r1=1004595&r2=1004596&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Tue Oct 5 09:54:17 2010
@@ -279,17 +279,33 @@ update_docs(Db, DocList, Options) ->
update_docs(Db, DocList, Options, interactive_edit).
update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
- DocList1 = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- DocList],
- Body = case UpdateType of
+ FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
+ Part1 = case UpdateType of
replicated_changes ->
- {[{new_edits, false}, {docs, DocList1}]};
+ <<"{\"new_edits\":false,\"docs\":[">>;
interactive_edit ->
- {[{docs, DocList1}]}
+ <<"{\"docs\":[">>
+ end,
+ BodyFun = fun(eof) ->
+ eof;
+ ([]) ->
+ {ok, <<"]}">>, eof};
+ ([Part | RestParts]) when is_binary(Part) ->
+ {ok, Part, RestParts};
+ ([Doc | RestParts]) ->
+ DocJson = couch_doc:to_json_obj(Doc, [revs, attachments]),
+ Data = case RestParts of
+ [] ->
+ ?JSON_ENCODE(DocJson);
+ _ ->
+ [?JSON_ENCODE(DocJson), ","]
+ end,
+ {ok, Data, RestParts}
end,
- FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
send_req(
HttpDb,
- [{method, post}, {path, "_bulk_docs"}, {body, ?JSON_ENCODE(Body)},
+ [{method, post}, {path, "_bulk_docs"},
+ {body, {chunkify, BodyFun, [Part1 | DocList]}},
{headers, [
{"X-Couch-Full-Commit", FullCommit},
{"Content-Type", "application/json"} ]}],
Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl?rev=1004596&r1=1004595&r2=1004596&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl Tue Oct 5 09:54:17 2010
@@ -32,8 +32,14 @@ httpdb_setup(#httpdb{} = Db) ->
send_req(#httpdb{headers = BaseHeaders} = HttpDb, Params, Callback) ->
Method = get_value(method, Params, get),
- Headers = get_value(headers, Params, []),
- Body = get_value(body, Params, []),
+ Headers1 = get_value(headers, Params, []),
+ {Body, Headers} = case get_value(body, Params, []) of
+ {chunkify, BodyFun, Acc0} ->
+ NewBodyFun = chunkify_fun(BodyFun),
+ {{NewBodyFun, Acc0}, [{"Transfer-Encoding", "chunked"} | Headers1]};
+ Else ->
+ {Else, Headers1}
+ end,
IbrowseOptions = [
{response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout}
| get_value(ibrowse_options, Params, []) ++ HttpDb#httpdb.proxy_options
@@ -208,3 +214,21 @@ redirect_url(RespHeaders, OrigUrl) ->
after_redirect(RedirectUrl, HttpDb, Params) ->
Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)),
{HttpDb#httpdb{url = RedirectUrl}, Params2}.
+
+
+chunkify_fun(BodyFun) ->
+ fun(eof_body_fun) ->
+ eof;
+ (Acc) ->
+ case BodyFun(Acc) of
+ eof ->
+ {ok, <<"0\r\n\r\n">>, eof_body_fun};
+ {ok, Data, NewAcc} ->
+ DataBin = iolist_to_binary(Data),
+ Chunk = [hex_size(DataBin), "\r\n", DataBin, "\r\n"],
+ {ok, iolist_to_binary(Chunk), NewAcc}
+ end
+ end.
+
+hex_size(Bin) ->
+ hd(io_lib:format("~.16B", [size(Bin)])).