You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ko...@apache.org on 2022/02/23 21:41:45 UTC
[couchdb] branch 3939-multipart-replicated-changes-race updated: Ensure the parser always monitors the worker
This is an automated email from the ASF dual-hosted git repository.
kocolosk pushed a commit to branch 3939-multipart-replicated-changes-race
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/3939-multipart-replicated-changes-race by this push:
new 883ad53 Ensure the parser always monitors the worker
883ad53 is described below
commit 883ad531bc091b631d522115c7f79198c58be372
Author: Adam Kocoloski <ko...@apache.org>
AuthorDate: Wed Feb 23 15:45:57 2022 -0500
Ensure the parser always monitors the worker
This adds an extra `hello_from_writer` message into the handshake
between the process that reads the multipart attachment data from the
socket and the writer processes (potentially on remote nodes) that
persist the data into each shard file. This ensures that even in the
case where a writer does not end up asking for the data (e.g. because
the revision already exists in the tree), the parser will monitor the
writer and therefore know when the writer has exited.
The patch makes some assumptions that the attachment flush function is
executed in the same process as the initial one that is spawned to
handle the fabric_rpc work request. That's true today, but if it
changed in the future it would be a non-obvious breakage to debug.
---
src/couch/src/couch_httpd_multipart.erl | 22 +++++++++++++---------
src/fabric/src/fabric_rpc.erl | 16 +++++++---------
2 files changed, 20 insertions(+), 18 deletions(-)
diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl
index ecdf105..26febfb 100644
--- a/src/couch/src/couch_httpd_multipart.erl
+++ b/src/couch/src/couch_httpd_multipart.erl
@@ -104,6 +104,10 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) ->
receive
abort_parsing ->
ok;
+ {hello_from_writer, Ref, WriterPid} ->
+ WriterRef = erlang:monitor(process, WriterPid),
+ NewCounters = orddict:store(WriterPid, {WriterRef, 0}, Counters),
+ mp_parse_atts(eof, {Ref, Chunks, Offset, NewCounters, Waiting});
{get_bytes, Ref, From} ->
C2 = update_writer(From, Counters),
case maybe_send_data({Ref, Chunks, Offset, C2, [From | Waiting]}) of
@@ -134,6 +138,10 @@ mp_abort_parse_atts(_, _) ->
maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
receive
+ {hello_from_writer, Ref, WriterPid} ->
+ WriterRef = erlang:monitor(process, WriterPid),
+ NewCounters = orddict:store(WriterPid, {WriterRef, 0}, Counters),
+ maybe_send_data({Ref, Chunks, Offset, NewCounters, Waiting});
{get_bytes, Ref, From} ->
NewCounters = update_writer(From, Counters),
maybe_send_data({Ref, Chunks, Offset, NewCounters, [From | Waiting]})
@@ -195,6 +203,10 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting},
maybe_send_data(NewAcc)
end;
+ {hello_from_writer, Ref, WriterPid} ->
+ WriterRef = erlang:monitor(process, WriterPid),
+ C2 = orddict:store(WriterPid, {WriterRef, 0}, Counters),
+ maybe_send_data({Ref, NewChunks, NewOffset, C2, Waiting});
{get_bytes, Ref, X} ->
C2 = update_writer(X, Counters),
maybe_send_data({Ref, NewChunks, NewOffset, C2, [X | NewWaiting]})
@@ -206,15 +218,7 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
update_writer(WriterPid, Counters) ->
UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end,
- InitialValue =
- case orddict:find(WriterPid, Counters) of
- {ok, IV} ->
- IV;
- error ->
- WriterRef = erlang:monitor(process, WriterPid),
- {WriterRef, 1}
- end,
- orddict:update(WriterPid, UpdateFun, InitialValue, Counters).
+ orddict:update(WriterPid, UpdateFun, Counters).
remove_writer(WriterPid, WriterRef, Counters) ->
case orddict:find(WriterPid, Counters) of
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index a90c94a..80a2de6 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -638,16 +638,14 @@ make_att_readers([#doc{atts = Atts0} = Doc | Rest]) ->
[Doc#doc{atts = Atts} | make_att_readers(Rest)].
make_att_reader({follows, Parser, Ref}) ->
+ % This code will fail if the returned closure is called by a
+ % process other than the one that called make_att_reader/1 in the
+ % first place. The reason we don't put everything inside the
+ % closure is that the `hello_from_writer` message must *always* be
+ % sent to the parser, even if the closure never gets called.
+ ParserRef = erlang:monitor(process, Parser),
+ Parser ! {hello_from_writer, Ref, self()},
fun() ->
- ParserRef =
- case get(mp_parser_ref) of
- undefined ->
- PRef = erlang:monitor(process, Parser),
- put(mp_parser_ref, PRef),
- PRef;
- Else ->
- Else
- end,
Parser ! {get_bytes, Ref, self()},
receive
{bytes, Ref, Bytes} ->