You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ko...@apache.org on 2022/02/23 21:41:45 UTC

[couchdb] branch 3939-multipart-replicated-changes-race updated: Ensure the parser always monitors the worker

This is an automated email from the ASF dual-hosted git repository.

kocolosk pushed a commit to branch 3939-multipart-replicated-changes-race
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/3939-multipart-replicated-changes-race by this push:
     new 883ad53  Ensure the parser always monitors the worker
883ad53 is described below

commit 883ad531bc091b631d522115c7f79198c58be372
Author: Adam Kocoloski <ko...@apache.org>
AuthorDate: Wed Feb 23 15:45:57 2022 -0500

    Ensure the parser always monitors the worker
    
    This adds an extra `hello_from_writer` message into the handshake
    between the process that reads the multipart attachment data from the
    socket and the writer processes (potentially on remote nodes) that
    persist the data into each shard file. This ensures that even in the
    case where a writer does not end up asking for the data (e.g. because
    the revision already exists in the tree), the parser will monitor the
    writer and therefore know when the writer has exited.
    
    The patch makes some assumptions that the attachment flush function is
    executed in the same process as the initial one that is spawned to
    handle the fabric_rpc work request. That's true today, but if it
    changed in the future it would be a non-obvious breakage to debug.
---
 src/couch/src/couch_httpd_multipart.erl | 22 +++++++++++++---------
 src/fabric/src/fabric_rpc.erl           | 16 +++++++---------
 2 files changed, 20 insertions(+), 18 deletions(-)

diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl
index ecdf105..26febfb 100644
--- a/src/couch/src/couch_httpd_multipart.erl
+++ b/src/couch/src/couch_httpd_multipart.erl
@@ -104,6 +104,10 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) ->
             receive
                 abort_parsing ->
                     ok;
+                {hello_from_writer, Ref, WriterPid} ->
+                    WriterRef = erlang:monitor(process, WriterPid),
+                    NewCounters = orddict:store(WriterPid, {WriterRef, 0}, Counters),
+                    mp_parse_atts(eof, {Ref, Chunks, Offset, NewCounters, Waiting});
                 {get_bytes, Ref, From} ->
                     C2 = update_writer(From, Counters),
                     case maybe_send_data({Ref, Chunks, Offset, C2, [From | Waiting]}) of
@@ -134,6 +138,10 @@ mp_abort_parse_atts(_, _) ->
 
 maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
     receive
+        {hello_from_writer, Ref, WriterPid} ->
+            WriterRef = erlang:monitor(process, WriterPid),
+            NewCounters = orddict:store(WriterPid, {WriterRef, 0}, Counters),
+            maybe_send_data({Ref, Chunks, Offset, NewCounters, Waiting});
         {get_bytes, Ref, From} ->
             NewCounters = update_writer(From, Counters),
             maybe_send_data({Ref, Chunks, Offset, NewCounters, [From | Waiting]})
@@ -195,6 +203,10 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
                                 NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting},
                                 maybe_send_data(NewAcc)
                         end;
+                    {hello_from_writer, Ref, WriterPid} ->
+                        WriterRef = erlang:monitor(process, WriterPid),
+                        C2 = orddict:store(WriterPid, {WriterRef, 0}, Counters),
+                        maybe_send_data({Ref, NewChunks, NewOffset, C2, Waiting});
                     {get_bytes, Ref, X} ->
                         C2 = update_writer(X, Counters),
                         maybe_send_data({Ref, NewChunks, NewOffset, C2, [X | NewWaiting]})
@@ -206,15 +218,7 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
 
 update_writer(WriterPid, Counters) ->
     UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end,
-    InitialValue =
-        case orddict:find(WriterPid, Counters) of
-            {ok, IV} ->
-                IV;
-            error ->
-                WriterRef = erlang:monitor(process, WriterPid),
-                {WriterRef, 1}
-        end,
-    orddict:update(WriterPid, UpdateFun, InitialValue, Counters).
+    orddict:update(WriterPid, UpdateFun, Counters).
 
 remove_writer(WriterPid, WriterRef, Counters) ->
     case orddict:find(WriterPid, Counters) of
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index a90c94a..80a2de6 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -638,16 +638,14 @@ make_att_readers([#doc{atts = Atts0} = Doc | Rest]) ->
     [Doc#doc{atts = Atts} | make_att_readers(Rest)].
 
 make_att_reader({follows, Parser, Ref}) ->
+    % This code will fail if the returned closure is called by a
+    % process other than the one that called make_att_reader/1 in the
+    % first place. The reason we don't put everything inside the
+    % closure is that the `hello_from_writer` message must *always* be
+    % sent to the parser, even if the closure never gets called.
+    ParserRef = erlang:monitor(process, Parser),
+    Parser ! {hello_from_writer, Ref, self()},
     fun() ->
-        ParserRef =
-            case get(mp_parser_ref) of
-                undefined ->
-                    PRef = erlang:monitor(process, Parser),
-                    put(mp_parser_ref, PRef),
-                    PRef;
-                Else ->
-                    Else
-            end,
         Parser ! {get_bytes, Ref, self()},
         receive
             {bytes, Ref, Bytes} ->