You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/10/05 11:54:17 UTC

svn commit: r1004596 - in /couchdb/branches/new_replicator/src/couchdb: couch_api_wrap.erl couch_api_wrap_httpc.erl

Author: fdmanana
Date: Tue Oct  5 09:54:17 2010
New Revision: 1004596

URL: http://svn.apache.org/viewvc?rev=1004596&view=rev
Log:
New replicator: do streaming when sending documents through the bulk docs API.

Modified:
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=1004596&r1=1004595&r2=1004596&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Tue Oct  5 09:54:17 2010
@@ -279,17 +279,33 @@ update_docs(Db, DocList, Options) ->
     update_docs(Db, DocList, Options, interactive_edit).
 
 update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
-    DocList1 = [couch_doc:to_json_obj(Doc, [revs]) || Doc <- DocList],
-    Body = case UpdateType of
+    FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
+    Part1 = case UpdateType of
     replicated_changes ->
-        {[{new_edits, false}, {docs, DocList1}]};
+        <<"{\"new_edits\":false,\"docs\":[">>;
     interactive_edit ->
-        {[{docs, DocList1}]}
+        <<"{\"docs\":[">>
+    end,
+    BodyFun = fun(eof) ->
+            eof;
+        ([]) ->
+            {ok, <<"]}">>, eof};
+        ([Part | RestParts]) when is_binary(Part) ->
+            {ok, Part, RestParts};
+        ([Doc | RestParts]) ->
+            DocJson = couch_doc:to_json_obj(Doc, [revs, attachments]),
+            Data = case RestParts of
+            [] ->
+                ?JSON_ENCODE(DocJson);
+            _ ->
+                [?JSON_ENCODE(DocJson), ","]
+            end,
+            {ok, Data, RestParts}
     end,
-    FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
     send_req(
         HttpDb,
-        [{method, post}, {path, "_bulk_docs"}, {body, ?JSON_ENCODE(Body)},
+        [{method, post}, {path, "_bulk_docs"},
+            {body, {chunkify, BodyFun, [Part1 | DocList]}},
             {headers, [
                 {"X-Couch-Full-Commit", FullCommit},
                 {"Content-Type", "application/json"} ]}],

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl?rev=1004596&r1=1004595&r2=1004596&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap_httpc.erl Tue Oct  5 09:54:17 2010
@@ -32,8 +32,14 @@ httpdb_setup(#httpdb{} = Db) ->
 
 send_req(#httpdb{headers = BaseHeaders} = HttpDb, Params, Callback) ->
     Method = get_value(method, Params, get),
-    Headers = get_value(headers, Params, []),
-    Body = get_value(body, Params, []),
+    Headers1 = get_value(headers, Params, []),
+    {Body, Headers} = case get_value(body, Params, []) of
+    {chunkify, BodyFun, Acc0} ->
+        NewBodyFun = chunkify_fun(BodyFun),
+        {{NewBodyFun, Acc0}, [{"Transfer-Encoding", "chunked"} | Headers1]};
+    Else ->
+        {Else, Headers1}
+    end,
     IbrowseOptions = [
         {response_format, binary}, {inactivity_timeout, HttpDb#httpdb.timeout}
         | get_value(ibrowse_options, Params, []) ++ HttpDb#httpdb.proxy_options
@@ -208,3 +214,21 @@ redirect_url(RespHeaders, OrigUrl) ->
 after_redirect(RedirectUrl, HttpDb, Params) ->
     Params2 = lists:keydelete(path, 1, lists:keydelete(qs, 1, Params)),
     {HttpDb#httpdb{url = RedirectUrl}, Params2}.
+
+
+chunkify_fun(BodyFun) ->
+    fun(eof_body_fun) ->
+        eof;
+    (Acc) ->
+        case BodyFun(Acc) of
+        eof ->
+            {ok, <<"0\r\n\r\n">>, eof_body_fun};
+        {ok, Data, NewAcc} ->
+            DataBin = iolist_to_binary(Data),
+            Chunk = [hex_size(DataBin), "\r\n", DataBin, "\r\n"],
+            {ok, iolist_to_binary(Chunk), NewAcc}
+        end
+    end.
+
+hex_size(Bin) ->
+    hd(io_lib:format("~.16B", [size(Bin)])).