You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@couchdb.apache.org by Adam Kocoloski <ko...@apache.org> on 2009/05/16 22:20:39 UTC

Re: svn commit: r775507 - /couchdb/trunk/src/couchdb/couch_rep.erl

Ok, so here's a start at reworking some of the memory management and  
buffering calculations.  It fixes the regression where attachment  
memory wasn't being included in the memory utilization numbers, and it  
also includes ibrowse memory utilization for attachments (which is  
larger than Couch's).

The decision to flush the buffer (to disk or to the remote target  
server) is dependent on the number of docs in the buffer, the  
approximate number of attachments, and the memory utilization.  I  
estimate the number of attachments as 0.5*nlinks, since every  
attachment download spawns two processes: one dedicated ibrowse worker  
and the attachment receiver.  The dedicated ibrowse workers get the  
attachments out of the connection pool and let us keep a better eye on  
their memory usage.

Each of the thresholds is currently just defined as a macro at the top  
of the module.  I haven't done any work on adjusting these thresholds  
dynamically or checkpointing as a function of elapsed time.

The replication module is getting pretty hairy again; in my opinion  
its probably time to refactor out the attachment stuff into its own  
module.  I may get around to that tomorrow if no one objects.

Best, Adam

On May 16, 2009, at 2:58 PM, kocolosk@apache.org wrote:

> Author: kocolosk
> Date: Sat May 16 18:58:18 2009
> New Revision: 775507
>
> URL: http://svn.apache.org/viewvc?rev=775507&view=rev
> Log:
> replicator memory management and buffer flush calculation updates
>
> * new should_flush fun considers ndocs, nattachments, memory in  
> making decision
> * memory utilized by attachment receivers is accounted for
> * download attachments using standalone connections instead of conn  
> pool.  This
>  prevents a document request from getting stuck behind a huge  
> attachment, which
>  would prevent us from triggering a buffer flush in time.  We also  
> consider the
>  memory utilization of the standalone ibrowse connection in  
> should_flush
>
> Modified:
>    couchdb/trunk/src/couchdb/couch_rep.erl
>
> Modified: couchdb/trunk/src/couchdb/couch_rep.erl
> URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=775507&r1=775506&r2=775507&view=diff
> = 
> = 
> = 
> = 
> = 
> = 
> = 
> = 
> ======================================================================
> --- couchdb/trunk/src/couchdb/couch_rep.erl (original)
> +++ couchdb/trunk/src/couchdb/couch_rep.erl Sat May 16 18:58:18 2009
> @@ -17,7 +17,12 @@
>
> -export([replicate/2]).
>
> +-define(BUFFER_NDOCS, 1000).
> +-define(BUFFER_NATTACHMENTS, 50).
> +-define(BUFFER_MEMORY, 10000000). %% bytes
> +
> -include("couch_db.hrl").
> +-include("../ibrowse/ibrowse.hrl").
>
> %% @spec replicate(Source::binary(), Target::binary()) ->
> %%      {ok, Stats} | {error, Reason}
> @@ -202,7 +207,8 @@
>     ets:update_counter(Stats, docs_read, length(Docs)),
>
>     %% save them (maybe in a buffer)
> -    {NewBuffer, NewContext} = case couch_util:should_flush() of
> +    {NewBuffer, NewContext} =
> +    case should_flush(lists:flatlength([Docs|Buffer])) of
>         true ->
>             Docs2 = lists:flatten([Docs|Buffer]),
>             {ok, Errors} = update_docs(Target, Docs2, [],  
> replicated_changes),
> @@ -222,7 +228,7 @@
>     ets:update_counter(State#state.stats, total_revs, RevsCount),
>     case State#state.listeners of
>     [] ->
> -        % still waiting for the first listener to sen a request
> +        % still waiting for the first listener to send a request
>         {noreply, State#state{current_seq=LastSeq,done=true}};
>     _ ->
>         {stop, normal, ok, State#state{current_seq=LastSeq}}
> @@ -327,13 +333,13 @@
>         [Id, couch_doc:rev_to_str(Rev), Error]),
>     dump_update_errors(Rest).
>
> -attachment_loop(ReqId) ->
> +attachment_loop(ReqId, Conn) ->
>     couch_util:should_flush(),
>     receive
>         {From, {set_req_id, NewId}} ->
>             %% we learn the ReqId to listen for
>             From ! {self(), {ok, NewId}},
> -            attachment_loop(NewId);
> +            attachment_loop(NewId, Conn);
>         {ibrowse_async_headers, ReqId, Status, Headers} ->
>             %% we got header, give the controlling process a chance  
> to react
>             receive
> @@ -343,37 +349,42 @@
>                     receive
>                         {From, continue} ->
>                             %% normal case
> -                            attachment_loop(ReqId);
> +                            attachment_loop(ReqId, Conn);
>                         {From, fail} ->
>                             %% error, failure code
>                             ?LOG_ERROR(
>                                 "streaming attachment failed with  
> status ~p",
>                                 [Status]),
> +                            catch ibrowse:stop_worker_process(Conn),
>                             exit(attachment_request_failed);
>                         {From, stop_ok} ->
>                             %% stop looping, controller will start a  
> new loop
> +                            catch ibrowse:stop_worker_process(Conn),
>                             stop_ok
>                     end
>             end,
> -            attachment_loop(ReqId);
> +            attachment_loop(ReqId, Conn);
>         {ibrowse_async_response, ReqId, {chunk_start,_}} ->
> -            attachment_loop(ReqId);
> +            attachment_loop(ReqId, Conn);
>         {ibrowse_async_response, ReqId, chunk_end} ->
> -            attachment_loop(ReqId);
> +            attachment_loop(ReqId, Conn);
>         {ibrowse_async_response, ReqId, {error, Err}} ->
>             ?LOG_ERROR("streaming attachment failed with ~p", [Err]),
> +            catch ibrowse:stop_worker_process(Conn),
>             exit(attachment_request_failed);
>         {ibrowse_async_response, ReqId, Data} ->
>             receive {From, gimme_data} -> From ! {self(), Data} end,
> -            attachment_loop(ReqId);
> -        {ibrowse_async_response_end, ReqId} -> ok
> +            attachment_loop(ReqId, Conn);
> +        {ibrowse_async_response_end, ReqId} ->
> +            catch ibrowse:stop_worker_process(Conn),
> +            exit(normal)
>     end.
>
> attachment_stub_converter(DbS, Id, Rev, {Name, {stub, Type,  
> Length}}) ->
>     #http_db{uri=DbUrl, headers=Headers} = DbS,
>     {Pos, [RevId|_]} = Rev,
>     Url = lists:flatten([DbUrl, url_encode(Id), "/", url_encode(? 
> b2l(Name)),
> -        "?rev=", couch_doc:rev_to_str({Pos,RevId})]),
> +        "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]),
>     ?LOG_DEBUG("Attachment URL ~p", [Url]),
>     {ok, RcvFun} = make_attachment_stub_receiver(Url, Headers, Name,
>         Type, Length),
> @@ -389,11 +400,14 @@
>
> make_attachment_stub_receiver(Url, Headers, Name, Type, Length,  
> Retries) ->
>     %% start the process that receives attachment data from ibrowse
> -    Pid = spawn_link(fun() -> attachment_loop(nil) end),
> +    #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
> +    {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port),
> +    Pid = spawn_link(fun() -> attachment_loop(nil, Conn) end),
>
>     %% make the async request
> -    Options = [{stream_to, Pid}, {response_format, binary}],
> -    ReqId = case ibrowse:send_req(Url, Headers, get, [], Options,  
> infinity) of
> +    Opts = [{stream_to, Pid}, {response_format, binary}],
> +    ReqId =
> +    case ibrowse:send_req_direct(Conn, Url, Headers, get, [], Opts,  
> infinity) of
>         {ibrowse_req_id, X} -> X;
>         {error, _Reason} -> exit(attachment_request_failed)
>     end,
> @@ -717,6 +731,46 @@
> open_doc_revs(Db, DocId, Revs, Options) ->
>     couch_db:open_doc_revs(Db, DocId, Revs, Options).
>
> +%% @spec should_flush() -> true | false
> +%% @doc Calculates whether it's time to flush the document buffer.  
> Considers
> +%%        - memory utilization
> +%%        - number of pending document writes
> +%%        - approximate number of pending attachment writes
> +should_flush(DocCount) when DocCount > ?BUFFER_NDOCS ->
> +    true;
> +should_flush(_DocCount) ->
> +    MeAndMyLinks = [self()|element(2,process_info(self(),links))],
> +
> +    case length(MeAndMyLinks)/2 > ?BUFFER_NATTACHMENTS of
> +    true -> true;
> +    false ->
> +        case memory_footprint(MeAndMyLinks) > 2*?BUFFER_MEMORY of
> +        true ->
> +            [garbage_collect(Pid) || Pid <- MeAndMyLinks],
> +            memory_footprint(MeAndMyLinks) > ?BUFFER_MEMORY;
> +        false -> false
> +        end
> +    end.
> +
> +%% @spec memory_footprint([pid()]) -> integer()
> +%% @doc Sum of process and binary memory utilization for all  
> processes in list
> +memory_footprint(PidList) ->
> +    ProcessMemory = lists:foldl(fun(Pid, Acc) ->
> +        Acc + element(2,process_info(Pid, memory))
> +    end, 0, PidList),
> +
> +    BinaryMemory = lists:foldl(fun(Pid, Acc) ->
> +        Acc + binary_memory(Pid)
> +    end, 0, PidList),
> +
> +    ?LOG_DEBUG("ProcessMem ~p BinaryMem ~p", [ProcessMemory,  
> BinaryMemory]),
> +    ProcessMemory + BinaryMemory.
> +
> +%% @spec binary_memory(pid()) -> integer()
> +%% @doc Memory utilization of all binaries referenced by this  
> process.
> +binary_memory(Pid) ->
> +    lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
> +        0, element(2,process_info(Pid, binary))).
>
> update_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc,  
> Options) ->
>     [] = Options,
>
>


Re: svn commit: r775507 - /couchdb/trunk/src/couchdb/couch_rep.erl

Posted by Adam Kocoloski <ko...@apache.org>.
On May 16, 2009, at 8:44 PM, Antony Blakey wrote:

> On 17/05/2009, at 4:20 AM, Adam Kocoloski wrote:
>
>> Ok, so here's a start at reworking some of the memory management  
>> and buffering calculations.  It fixes the regression where  
>> attachment memory wasn't being included in the memory utilization  
>> numbers, and it also includes ibrowse memory utilization for  
>> attachments (which is larger than Couch's).
>>
>> The decision to flush the buffer (to disk or to the remote target  
>> server) is dependent on the number of docs in the buffer, the  
>> approximate number of attachments, and the memory utilization.  I  
>> estimate the number of attachments as 0.5*nlinks, since every  
>> attachment download spawns two processes: one dedicated ibrowse  
>> worker and the attachment receiver.  The dedicated ibrowse workers  
>> get the attachments out of the connection pool and let us keep a  
>> better eye on their memory usage.
>>
>> Each of the thresholds is currently just defined as a macro at the  
>> top of the module.  I haven't done any work on adjusting these  
>> thresholds dynamically or checkpointing as a function of elapsed  
>> time.
>>
>> The replication module is getting pretty hairy again; in my opinion  
>> its probably time to refactor out the attachment stuff into its own  
>> module.  I may get around to that tomorrow if no one objects.
>
> What do you think about adding binary backoff to help with  
> unreliable links? Even if attachments are buffered to disk there's  
> still the issue of making checkpoint progress in the face of link  
> failure. Or maybe checkpoint the buffer on any failure (although  
> that won't help the situation where couchdb quits).

We already try to checkpoint in the event of failure, but it doesn't  
really help much because the checkpoint record has to be saved on both  
source and target in order to be recognized the next time around.

I'm definitely in favor of adding code to help the replicator make  
progress in the face of link failure, whether that be some sort of  
backoff algorithm, caching and reusing partial attachment downloads,  
etc.

Best, Adam

Re: svn commit: r775507 - /couchdb/trunk/src/couchdb/couch_rep.erl

Posted by Antony Blakey <an...@gmail.com>.
On 17/05/2009, at 4:20 AM, Adam Kocoloski wrote:

> Ok, so here's a start at reworking some of the memory management and  
> buffering calculations.  It fixes the regression where attachment  
> memory wasn't being included in the memory utilization numbers, and  
> it also includes ibrowse memory utilization for attachments (which  
> is larger than Couch's).
>
> The decision to flush the buffer (to disk or to the remote target  
> server) is dependent on the number of docs in the buffer, the  
> approximate number of attachments, and the memory utilization.  I  
> estimate the number of attachments as 0.5*nlinks, since every  
> attachment download spawns two processes: one dedicated ibrowse  
> worker and the attachment receiver.  The dedicated ibrowse workers  
> get the attachments out of the connection pool and let us keep a  
> better eye on their memory usage.
>
> Each of the thresholds is currently just defined as a macro at the  
> top of the module.  I haven't done any work on adjusting these  
> thresholds dynamically or checkpointing as a function of elapsed time.
>
> The replication module is getting pretty hairy again; in my opinion  
> its probably time to refactor out the attachment stuff into its own  
> module.  I may get around to that tomorrow if no one objects.

What do you think about adding binary backoff to help with unreliable  
links? Even if attachments are buffered to disk there's still the  
issue of making checkpoint progress in the face of link failure. Or  
maybe checkpoint the buffer on any failure (although that won't help  
the situation where couchdb quits).

Antony Blakey
--------------------------
CTO, Linkuistics Pty Ltd
Ph: 0438 840 787

Isn't it enough to see that a garden is beautiful without having to  
believe that there are fairies at the bottom of it too?
   -- Douglas Adams