You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by va...@apache.org on 2018/06/19 21:00:24 UTC

[couchdb] branch do-not-pass-funs-between-nodes-ext created (now 4c054d7)

This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a change to branch do-not-pass-funs-between-nodes-ext
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


      at 4c054d7  Prepare to fabric attachment receiver from a fun closure to a tuple

This branch includes the following new commits:

     new 4c054d7  Prepare to fabric attachment receiver from a fun closure to a tuple

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[couchdb] 01/01: Prepare to fabric attachment receiver from a fun closure to a tuple

Posted by va...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

vatamane pushed a commit to branch do-not-pass-funs-between-nodes-ext
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 4c054d7e46fe0993e6f60878edde891d53b461a5
Author: Nick Vatamaniuc <va...@apache.org>
AuthorDate: Tue Jun 19 15:05:00 2018 -0400

    Prepare to fabric attachment receiver from a fun closure to a tuple
    
    Passing closures around is fragile and prevents smooth upgrading. Instead pass
    a tuple with a data from the receiver closure explicitly and convert to back to
    a local fun locally on each node.
    
    This is a preparatory commit before the switch. To ensure attachment uploading
    requests are successful, would need to first install this change on all the
    nodes. Then in a separate upgrade step, switch fabric.erl to call
    fabric_doc_atts:receiver instead fabric_doc_attatchments:recevier.
---
 src/fabric/src/fabric.erl          |   1 +
 src/fabric/src/fabric_doc_atts.erl | 168 +++++++++++++++++++++++++++++++++++++
 src/fabric/src/fabric_rpc.erl      |   2 +
 3 files changed, 171 insertions(+)

diff --git a/src/fabric/src/fabric.erl b/src/fabric/src/fabric.erl
index 4a07271..a9919b4 100644
--- a/src/fabric/src/fabric.erl
+++ b/src/fabric/src/fabric.erl
@@ -279,6 +279,7 @@ purge_docs(_DbName, _IdsRevs) ->
 att_receiver(Req, Length) ->
     fabric_doc_attachments:receiver(Req, Length).
 
+
 %% @equiv all_docs(DbName, [], Callback, Acc0, QueryArgs)
 all_docs(DbName, Callback, Acc, QueryArgs) ->
     all_docs(DbName, [], Callback, Acc, QueryArgs).
diff --git a/src/fabric/src/fabric_doc_atts.erl b/src/fabric/src/fabric_doc_atts.erl
new file mode 100644
index 0000000..7ef5dd8
--- /dev/null
+++ b/src/fabric/src/fabric_doc_atts.erl
@@ -0,0 +1,168 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(fabric_doc_atts).
+
+-include_lib("fabric/include/fabric.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+-export([
+    receiver/2,
+    receiver_callback/2
+]).
+
+
+receiver(_Req, undefined) ->
+    <<"">>;
+receiver(_Req, {unknown_transfer_encoding, Unknown}) ->
+    exit({unknown_transfer_encoding, Unknown});
+receiver(Req, chunked) ->
+    MiddleMan = spawn(fun() -> middleman(Req, chunked) end),
+    {fabric_attachment_receiver, MiddleMan, chunked};
+receiver(_Req, 0) ->
+    <<"">>;
+receiver(Req, Length) when is_integer(Length) ->
+    maybe_send_continue(Req),
+    Middleman = spawn(fun() -> middleman(Req, Length) end),
+    {fabric_attachment_receiver, Middleman, Length};
+receiver(_Req, Length) ->
+    exit({length_not_integer, Length}).
+
+
+receiver_callback(Middleman, chunked) ->
+    fun(4096, ChunkFun, State) ->
+        write_chunks(Middleman, ChunkFun, State)
+    end;
+receiver_callback(Middleman, Length) when is_integer(Length) ->
+    fun() ->
+        Middleman ! {self(), gimme_data},
+        Timeout = fabric_util:attachments_timeout(),
+        receive
+            {Middleman, Data} ->
+                rexi:reply(attachment_chunk_received),
+                Data
+        after Timeout ->
+            exit(timeout)
+        end
+    end.
+
+
+%%
+%% internal
+%%
+
+maybe_send_continue(#httpd{mochi_req = MochiReq} = Req) ->
+    case couch_httpd:header_value(Req, "expect") of
+    undefined ->
+        ok;
+    Expect ->
+        case string:to_lower(Expect) of
+        "100-continue" ->
+            MochiReq:start_raw_response({100, gb_trees:empty()});
+        _ ->
+            ok
+        end
+    end.
+
+write_chunks(MiddleMan, ChunkFun, State) ->
+    MiddleMan ! {self(), gimme_data},
+    Timeout = fabric_util:attachments_timeout(),
+    receive
+    {MiddleMan, ChunkRecordList} ->
+        rexi:reply(attachment_chunk_received),
+        case flush_chunks(ChunkRecordList, ChunkFun, State) of
+            {continue, NewState} ->
+                write_chunks(MiddleMan, ChunkFun, NewState);
+            {done, NewState} ->
+                NewState
+        end
+    after Timeout ->
+        exit(timeout)
+    end.
+
+flush_chunks([], _ChunkFun, State) ->
+    {continue, State};
+flush_chunks([{0, _}], _ChunkFun, State) ->
+    {done, State};
+flush_chunks([Chunk | Rest], ChunkFun, State) ->
+    NewState = ChunkFun(Chunk, State),
+    flush_chunks(Rest, ChunkFun, NewState).
+
+receive_unchunked_attachment(_Req, 0) ->
+    ok;
+receive_unchunked_attachment(Req, Length) ->
+    receive {MiddleMan, go} ->
+        Data = couch_httpd:recv(Req, 0),
+        MiddleMan ! {self(), Data}
+    end,
+    receive_unchunked_attachment(Req, Length - size(Data)).
+
+middleman(Req, chunked) ->
+    % spawn a process to actually receive the uploaded data
+    RcvFun = fun(ChunkRecord, ok) ->
+        receive {From, go} -> From ! {self(), ChunkRecord} end, ok
+    end,
+    Receiver = spawn(fun() -> couch_httpd:recv_chunked(Req,4096,RcvFun,ok) end),
+
+    % take requests from the DB writers and get data from the receiver
+    N = erlang:list_to_integer(config:get("cluster","n")),
+    Timeout = fabric_util:attachments_timeout(),
+    middleman_loop(Receiver, N, [], [], Timeout);
+
+middleman(Req, Length) ->
+    Receiver = spawn(fun() -> receive_unchunked_attachment(Req, Length) end),
+    N = erlang:list_to_integer(config:get("cluster","n")),
+    Timeout = fabric_util:attachments_timeout(),
+    middleman_loop(Receiver, N, [], [], Timeout).
+
+middleman_loop(Receiver, N, Counters0, ChunkList0, Timeout) ->
+    receive {From, gimme_data} ->
+        % Figure out how far along this writer (From) is in the list
+        ListIndex = case fabric_dict:lookup_element(From, Counters0) of
+        undefined -> 0;
+        I -> I
+        end,
+
+        % Talk to the receiver to get another chunk if necessary
+        ChunkList1 = if ListIndex == length(ChunkList0) ->
+            Receiver ! {self(), go},
+            receive
+                {Receiver, ChunkRecord} ->
+                    ChunkList0 ++ [ChunkRecord]
+            end;
+        true -> ChunkList0 end,
+
+        % reply to the writer
+        Reply = lists:nthtail(ListIndex, ChunkList1),
+        From ! {self(), Reply},
+
+        % Update the counter for this writer
+        Counters1 = fabric_dict:update_counter(From, length(Reply), Counters0),
+
+        % Drop any chunks that have been sent to all writers
+        Size = fabric_dict:size(Counters1),
+        NumToDrop = lists:min([I || {_, I} <- Counters1]),
+
+        {ChunkList3, Counters3} =
+        if Size == N andalso NumToDrop > 0 ->
+            ChunkList2 = lists:nthtail(NumToDrop, ChunkList1),
+            Counters2 = [{F, I-NumToDrop} || {F, I} <- Counters1],
+            {ChunkList2, Counters2};
+        true ->
+            {ChunkList1, Counters1}
+        end,
+
+        middleman_loop(Receiver, N, Counters3, ChunkList3, Timeout)
+    after Timeout ->
+        exit(Receiver, kill),
+        ok
+    end.
diff --git a/src/fabric/src/fabric_rpc.erl b/src/fabric/src/fabric_rpc.erl
index 913aafe0..60526f4 100644
--- a/src/fabric/src/fabric_rpc.erl
+++ b/src/fabric/src/fabric_rpc.erl
@@ -440,6 +440,8 @@ make_att_reader({follows, Parser, Ref}) ->
                 throw({mp_parser_died, Reason})
         end
     end;
+make_att_reader({fabric_attachment_receiver, Middleman, Length}) ->
+    fabric_doc_atts:receiver_callback(Middleman, Length);
 make_att_reader(Else) ->
     Else.