You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2018/06/20 01:11:46 UTC

[couchdb] branch master updated: Prepare to fabric attachment receiver from a fun closure to a tuple

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fe53e43  Prepare to fabric attachment receiver from a fun closure to a tuple
fe53e43 is described below

commit fe53e437ca5ec9d23aa1b55d7934daced157a9e3
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Jun 19 15:05:00 2018 -0400

    Prepare to fabric attachment receiver from a fun closure to a tuple
    
    Passing closures around is fragile and prevents smooth upgrading. Instead pass
    a tuple with a data from the receiver closure explicitly and convert to back to
    a local fun locally on each node.
    
    This is a preparatory commit before the switch. To ensure attachment uploading
    requests are successful, would need to first install this change on all the
    nodes. Then in a separate upgrade step, switch fabric.erl to call
    fabric_doc_atts:receiver instead fabric_doc_attatchments:recevier.
---
 src/fabric/src/fabric_doc_atts.erl | 168 +++++++++++++++++++++++++++++++++++++
 src/fabric/src/fabric_rpc.erl      |   2 +
 2 files changed, 170 insertions(+)

diff --git a/src/fabric/src/fabric_doc_atts.erl b/src/fabric/src/fabric_doc_atts.erl
new file mode 100644
index 0000000..7ef5dd8
--- /dev/null
+++ b/src/fabric/src/fabric_doc_atts.erl
@@ -0,0 +1,168 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_doc_atts).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-export([
+    receiver/2,
+    receiver_callback/2
+]).
+
+
+receiver(_Req, undefined) ->
+    <<"">>;
+receiver(_Req, {unknown_transfer_encoding, Unknown}) ->
+    exit({unknown_transfer_encoding, Unknown});
+receiver(Req, chunked) ->
+    MiddleMan = spawn(fun() -> middleman(Req, chunked) end),
+    {fabric_attachment_receiver, MiddleMan, chunked};
+receiver(_Req, 0) ->
+    <<"">>;
+receiver(Req, Length) when is_integer(Length) ->
+    maybe_send_continue(Req),
+    Middleman = spawn(fun() -> middleman(Req, Length) end),
+    {fabric_attachment_receiver, Middleman, Length};
+receiver(_Req, Length) ->
+    exit({length_not_integer, Length}).
+
+
+receiver_callback(Middleman, chunked) ->
+    fun(4096, ChunkFun, State) ->
+        write_chunks(Middleman, ChunkFun, State)
+    end;
+receiver_callback(Middleman, Length) when is_integer(Length) ->
+    fun() ->
+        Middleman ! {self(), gimme_data},
+        Timeout = fabric_util:attachments_timeout(),
+        receive
+            {Middleman, Data} ->
+                rexi:reply(attachment_chunk_received),
+                Data
+        after Timeout ->
+            exit(timeout)
+        end
+    end.
+
+
+%%
+%% internal
+%%
+
+maybe_send_continue(#httpd{mochi_req = MochiReq} = Req) ->
+    case couch_httpd:header_value(Req, "expect") of
+    undefined ->
+        ok;
+    Expect ->
+        case string:to_lower(Expect) of
+        "100-continue" ->
+            MochiReq:start_raw_response({100, gb_trees:empty()});
+        _ ->
+            ok
+        end
+    end.
+
+write_chunks(MiddleMan, ChunkFun, State) ->
+    MiddleMan ! {self(), gimme_data},
+    Timeout = fabric_util:attachments_timeout(),
+    receive
+    {MiddleMan, ChunkRecordList} ->
+        rexi:reply(attachment_chunk_received),
+        case flush_chunks(ChunkRecordList, ChunkFun, State) of
+            {continue, NewState} ->
+                write_chunks(MiddleMan, ChunkFun, NewState);
+            {done, NewState} ->
+                NewState
+        end
+    after Timeout ->
+        exit(timeout)
+    end.
+
+flush_chunks([], _ChunkFun, State) ->
+    {continue, State};
+flush_chunks([{0, _}], _ChunkFun, State) ->
+    {done, State};
+flush_chunks([Chunk | Rest], ChunkFun, State) ->
+    NewState = ChunkFun(Chunk, State),
+    flush_chunks(Rest, ChunkFun, NewState).
+
+receive_unchunked_attachment(_Req, 0) ->
+    ok;
+receive_unchunked_attachment(Req, Length) ->
+    receive {MiddleMan, go} ->
+        Data = couch_httpd:recv(Req, 0),
+        MiddleMan ! {self(), Data}
+    end,
+    receive_unchunked_attachment(Req, Length - size(Data)).
+
+middleman(Req, chunked) ->
+    % spawn a process to actually receive the uploaded data
+    RcvFun = fun(ChunkRecord, ok) ->
+        receive {From, go} -> From ! {self(), ChunkRecord} end, ok
+    end,
+    Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end),
+
+    % take requests from the DB writers and get data from the receiver
+    N = erlang:list_to_integer(config:get("cluster","n")),
+    Timeout = fabric_util:attachments_timeout(),
+    middleman_loop(Receiver, N, [], [], Timeout);
+
+middleman(Req, Length) ->
+    Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end),
+    N = erlang:list_to_integer(config:get("cluster","n")),
+    Timeout = fabric_util:attachments_timeout(),
+    middleman_loop(Receiver, N, [], [], Timeout).
+
+middleman_loop(Receiver, N, Counters0, ChunkList0, Timeout) ->
+    receive {From, gimme_data} ->
+        % Figure out how far along this writer (From) is in the list
+        ListIndex = case fabric_dict:lookup_element(From, Counters0) of
+        undefined -> 0;
+        I -> I
+        end,
+
+        % Talk to the receiver to get another chunk if necessary
+        ChunkList1 = if ListIndex == length(ChunkList0) ->
+            Receiver ! {self(), go},
+            receive
+                {Receiver, ChunkRecord} ->
+                    ChunkList0 ++ [ChunkRecord]
+            end;
+        true -> ChunkList0 end,
+
+        % reply to the writer
+        Reply = lists:nthtail(ListIndex, ChunkList1),
+        From ! {self(), Reply},
+
+        % Update the counter for this writer
+        Counters1 = fabric_dict:update_counter(From, length(Reply), Counters0),
+
+        % Drop any chunks that have been sent to all writers
+        Size = fabric_dict:size(Counters1),
+        NumToDrop = lists:min([I || {_, I} <- Counters1]),
+
+        {ChunkList3, Counters3} =
+        if Size == N andalso NumToDrop > 0 ->
+            ChunkList2 = lists:nthtail(NumToDrop, ChunkList1),
+            Counters2 = [{F, I-NumToDrop} || {F, I} <- Counters1],
+            {ChunkList2, Counters2};
+        true ->
+            {ChunkList1, Counters1}
+        end,
+
+        middleman_loop(Receiver, N, Counters3, ChunkList3, Timeout)
+    after Timeout ->
+        exit(Receiver, kill),
+        ok
+    end.
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 913aafe0..60526f4 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -440,6 +440,8 @@ make_att_reader({follows, Parser, Ref}) ->
                 throw({mp_parser_died, Reason})
         end
     end;
+make_att_reader({fabric_attachment_receiver, Middleman, Length}) ->
+    fabric_doc_atts:receiver_callback(Middleman, Length);
 make_att_reader(Else) ->
     Else.