You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2018/06/20 01:11:46 UTC
[couchdb] branch master updated: Prepare to fabric attachment
receiver from a fun closure to a tuple
This is an automated email from the ASF dual-hosted git repository.
vatamane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git
The following commit(s) were added to refs/heads/master by this push:
new fe53e43 Prepare to fabric attachment receiver from a fun closure to a tuple
fe53e43 is described below
commit fe53e437ca5ec9d23aa1b55d7934daced157a9e3
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Jun 19 15:05:00 2018 -0400
Prepare to fabric attachment receiver from a fun closure to a tuple
Passing closures around is fragile and prevents smooth upgrading. Instead pass
a tuple with a data from the receiver closure explicitly and convert to back to
a local fun locally on each node.
This is a preparatory commit before the switch. To ensure attachment uploading
requests are successful, would need to first install this change on all the
nodes. Then in a separate upgrade step, switch fabric.erl to call
fabric_doc_atts:receiver instead fabric_doc_attatchments:recevier.
---
src/fabric/src/fabric_doc_atts.erl | 168 +++++++++++++++++++++++++++++++++++++
src/fabric/src/fabric_rpc.erl | 2 +
2 files changed, 170 insertions(+)
diff --git a/src/fabric/src/fabric_doc_atts.erl b/src/fabric/src/fabric_doc_atts.erl
new file mode 100644
index 0000000..7ef5dd8
--- /dev/null
+++ b/src/fabric/src/fabric_doc_atts.erl
@@ -0,0 +1,168 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_doc_atts).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-export([
+ receiver/2,
+ receiver_callback/2
+]).
+
+
+receiver(_Req, undefined) ->
+ <<"">>;
+receiver(_Req, {unknown_transfer_encoding, Unknown}) ->
+ exit({unknown_transfer_encoding, Unknown});
+receiver(Req, chunked) ->
+ MiddleMan = spawn(fun() -> middleman(Req, chunked) end),
+ {fabric_attachment_receiver, MiddleMan, chunked};
+receiver(_Req, 0) ->
+ <<"">>;
+receiver(Req, Length) when is_integer(Length) ->
+ maybe_send_continue(Req),
+ Middleman = spawn(fun() -> middleman(Req, Length) end),
+ {fabric_attachment_receiver, Middleman, Length};
+receiver(_Req, Length) ->
+ exit({length_not_integer, Length}).
+
+
+receiver_callback(Middleman, chunked) ->
+ fun(4096, ChunkFun, State) ->
+ write_chunks(Middleman, ChunkFun, State)
+ end;
+receiver_callback(Middleman, Length) when is_integer(Length) ->
+ fun() ->
+ Middleman ! {self(), gimme_data},
+ Timeout = fabric_util:attachments_timeout(),
+ receive
+ {Middleman, Data} ->
+ rexi:reply(attachment_chunk_received),
+ Data
+ after Timeout ->
+ exit(timeout)
+ end
+ end.
+
+
+%%
+%% internal
+%%
+
+maybe_send_continue(#httpd{mochi_req = MochiReq} = Req) ->
+ case couch_httpd:header_value(Req, "expect") of
+ undefined ->
+ ok;
+ Expect ->
+ case string:to_lower(Expect) of
+ "100-continue" ->
+ MochiReq:start_raw_response({100, gb_trees:empty()});
+ _ ->
+ ok
+ end
+ end.
+
+write_chunks(MiddleMan, ChunkFun, State) ->
+ MiddleMan ! {self(), gimme_data},
+ Timeout = fabric_util:attachments_timeout(),
+ receive
+ {MiddleMan, ChunkRecordList} ->
+ rexi:reply(attachment_chunk_received),
+ case flush_chunks(ChunkRecordList, ChunkFun, State) of
+ {continue, NewState} ->
+ write_chunks(MiddleMan, ChunkFun, NewState);
+ {done, NewState} ->
+ NewState
+ end
+ after Timeout ->
+ exit(timeout)
+ end.
+
+flush_chunks([], _ChunkFun, State) ->
+ {continue, State};
+flush_chunks([{0, _}], _ChunkFun, State) ->
+ {done, State};
+flush_chunks([Chunk | Rest], ChunkFun, State) ->
+ NewState = ChunkFun(Chunk, State),
+ flush_chunks(Rest, ChunkFun, NewState).
+
+receive_unchunked_attachment(_Req, 0) ->
+ ok;
+receive_unchunked_attachment(Req, Length) ->
+ receive {MiddleMan, go} ->
+ Data = couch_httpd:recv(Req, 0),
+ MiddleMan ! {self(), Data}
+ end,
+ receive_unchunked_attachment(Req, Length - size(Data)).
+
+middleman(Req, chunked) ->
+ % spawn a process to actually receive the uploaded data
+ RcvFun = fun(ChunkRecord, ok) ->
+ receive {From, go} -> From ! {self(), ChunkRecord} end, ok
+ end,
+ Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end),
+
+ % take requests from the DB writers and get data from the receiver
+ N = erlang:list_to_integer(config:get("cluster","n")),
+ Timeout = fabric_util:attachments_timeout(),
+ middleman_loop(Receiver, N, [], [], Timeout);
+
+middleman(Req, Length) ->
+ Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end),
+ N = erlang:list_to_integer(config:get("cluster","n")),
+ Timeout = fabric_util:attachments_timeout(),
+ middleman_loop(Receiver, N, [], [], Timeout).
+
+middleman_loop(Receiver, N, Counters0, ChunkList0, Timeout) ->
+ receive {From, gimme_data} ->
+ % Figure out how far along this writer (From) is in the list
+ ListIndex = case fabric_dict:lookup_element(From, Counters0) of
+ undefined -> 0;
+ I -> I
+ end,
+
+ % Talk to the receiver to get another chunk if necessary
+ ChunkList1 = if ListIndex == length(ChunkList0) ->
+ Receiver ! {self(), go},
+ receive
+ {Receiver, ChunkRecord} ->
+ ChunkList0 ++ [ChunkRecord]
+ end;
+ true -> ChunkList0 end,
+
+ % reply to the writer
+ Reply = lists:nthtail(ListIndex, ChunkList1),
+ From ! {self(), Reply},
+
+ % Update the counter for this writer
+ Counters1 = fabric_dict:update_counter(From, length(Reply), Counters0),
+
+ % Drop any chunks that have been sent to all writers
+ Size = fabric_dict:size(Counters1),
+ NumToDrop = lists:min([I || {_, I} <- Counters1]),
+
+ {ChunkList3, Counters3} =
+ if Size == N andalso NumToDrop > 0 ->
+ ChunkList2 = lists:nthtail(NumToDrop, ChunkList1),
+ Counters2 = [{F, I-NumToDrop} || {F, I} <- Counters1],
+ {ChunkList2, Counters2};
+ true ->
+ {ChunkList1, Counters1}
+ end,
+
+ middleman_loop(Receiver, N, Counters3, ChunkList3, Timeout)
+ after Timeout ->
+ exit(Receiver, kill),
+ ok
+ end.
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 913aafe0..60526f4 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -440,6 +440,8 @@ make_att_reader({follows, Parser, Ref}) ->
throw({mp_parser_died, Reason})
end
end;
+make_att_reader({fabric_attachment_receiver, Middleman, Length}) ->
+ fabric_doc_atts:receiver_callback(Middleman, Length);
make_att_reader(Else) ->
Else.