You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2011/05/31 22:19:41 UTC

svn commit: r1129900 - in /couchdb/trunk/src/couchdb: couch_api_wrap.erl couch_replicator_doc_copier.erl

Author: fdmanana
Date: Tue May 31 20:19:40 2011
New Revision: 1129900

URL: http://svn.apache.org/viewvc?rev=1129900&view=rev
Log:
Skip multipart attachments if doc is rejected by the target
    
If the target rejects the document (via validate_doc_update functions),
the connection is hold for a long time because the attachments are not
streamed.


Modified:
    couchdb/trunk/src/couchdb/couch_api_wrap.erl
    couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl

Modified: couchdb/trunk/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_api_wrap.erl?rev=1129900&r1=1129899&r2=1129900&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/trunk/src/couchdb/couch_api_wrap.erl Tue May 31 20:19:40 2011
@@ -179,7 +179,7 @@ open_doc_revs(#httpdb{} = HttpDb, Id, Re
     end;
 open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
     {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
-    {ok, lists:foldl(Fun, Acc, Results)}.
+    {ok, lists:foldl(fun(R, A) -> {_, A2} = Fun(R, A), A2 end, Acc, Results)}.
 
 
 open_doc(#httpdb{} = Db, Id, Options) ->
@@ -475,20 +475,25 @@ receive_docs(Streamer, UserFun, UserAcc)
         {"multipart/related", _} = ContentType ->
             case doc_from_multi_part_stream(
                 ContentType, fun() -> receive_doc_data(Streamer) end) of
-            {ok, Doc} ->
-                UserAcc2 = UserFun({ok, Doc}, UserAcc),
+            {ok, Doc, Parser} ->
+                case UserFun({ok, Doc}, UserAcc) of
+                {ok, UserAcc2} ->
+                    ok;
+                {skip, UserAcc2} ->
+                    skip_multipart_atts(Parser)
+                end,
                 receive_docs(Streamer, UserFun, UserAcc2)
             end;
         {"application/json", []} ->
             Doc = couch_doc:from_json_obj(
                     ?JSON_DECODE(receive_all(Streamer, []))),
-            UserAcc2 = UserFun({ok, Doc}, UserAcc),
+            {_, UserAcc2} = UserFun({ok, Doc}, UserAcc),
             receive_docs(Streamer, UserFun, UserAcc2);
         {"application/json", [{"error","true"}]} ->
             {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, [])),
             Rev = get_value(<<"missing">>, ErrorProps),
             Result = {{not_found, missing}, couch_doc:parse_rev(Rev)},
-            UserAcc2 = UserFun(Result, UserAcc),
+            {_, UserAcc2} = UserFun(Result, UserAcc),
             receive_docs(Streamer, UserFun, UserAcc2)
         end;
     done ->
@@ -496,6 +501,24 @@ receive_docs(Streamer, UserFun, UserAcc)
     end.
 
 
+skip_multipart_atts(Parser) ->
+    skip_multipart_atts(Parser, erlang:monitor(process, Parser)).
+
+skip_multipart_atts(Parser, MonRef) ->
+    case is_process_alive(Parser) of
+    true ->
+        Parser ! {get_bytes, self()},
+        receive
+        {bytes, Bytes} ->
+             skip_multipart_atts(Parser, MonRef);
+        {'DOWN', MonRef, _, _, _} ->
+             ok
+        end;
+    false ->
+        erlang:demonitor(MonRef, [flush])
+    end.
+
+
 restart_remote_open_doc_revs() ->
     receive
     {body_bytes, _} ->
@@ -601,7 +624,7 @@ doc_from_multi_part_stream(ContentType, 
             (A) ->
                 A
             end, Doc#doc.atts),
-        {ok, Doc#doc{atts = Atts2}}
+        {ok, Doc#doc{atts = Atts2}, Parser}
     end.
 
 

Modified: couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl?rev=1129900&r1=1129899&r2=1129900&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl Tue May 31 20:19:40 2011
@@ -350,7 +350,7 @@ fetch_doc(Source, {Id, Revs, PAs, _Seq},
 local_doc_handler({ok, Doc}, {Target, DocList, W, F}) ->
     case batch_doc(Doc) of
     true ->
-        {Target, [Doc | DocList], W, F};
+        {ok, {Target, [Doc | DocList], W, F}};
     false ->
         ?LOG_DEBUG("Worker flushing doc with attachments", []),
         Target2 = open_db(Target),
@@ -358,18 +358,18 @@ local_doc_handler({ok, Doc}, {Target, Do
         close_db(Target2),
         case Success of
         true ->
-            {Target, DocList, W + 1, F};
+            {ok, {Target, DocList, W + 1, F}};
         false ->
-            {Target, DocList, W, F + 1}
+            {ok, {Target, DocList, W, F + 1}}
         end
     end;
 local_doc_handler(_, Acc) ->
-    Acc.
+    {ok, Acc}.
 
 
 remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) ->
     ok = gen_server:call(Parent, {batch_doc, Doc}, infinity),
-    Acc;
+    {ok, Acc};
 remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
     % Immediately flush documents with attachments received from a remote
     % source. The data property of each attachment is a function that starts
@@ -380,9 +380,14 @@ remote_doc_handler({ok, Doc}, {Parent, T
     Success = (flush_doc(Target2, Doc) =:= ok),
     ok = gen_server:call(Parent, {doc_flushed, Success}, infinity),
     close_db(Target2),
-    Acc;
+    case Success of
+    true ->
+        {ok, Acc};
+    false ->
+        {skip, Acc}
+    end;
 remote_doc_handler(_, Acc) ->
-    Acc.
+    {ok, Acc}.
 
 
 spawn_writer(Target, #batch{docs = DocList, size = Size}) ->