You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@couchdb.apache.org by Adam Kocoloski <ko...@apache.org> on 2009/05/16 22:20:39 UTC
Re: svn commit: r775507 - /couchdb/trunk/src/couchdb/couch_rep.erl
Ok, so here's a start at reworking some of the memory management and
buffering calculations. It fixes the regression where attachment
memory wasn't being included in the memory utilization numbers, and it
also includes ibrowse memory utilization for attachments (which is
larger than Couch's).
The decision to flush the buffer (to disk or to the remote target
server) is dependent on the number of docs in the buffer, the
approximate number of attachments, and the memory utilization. I
estimate the number of attachments as 0.5*nlinks, since every
attachment download spawns two processes: one dedicated ibrowse worker
and the attachment receiver. The dedicated ibrowse workers get the
attachments out of the connection pool and let us keep a better eye on
their memory usage.
Each of the thresholds is currently just defined as a macro at the top
of the module. I haven't done any work on adjusting these thresholds
dynamically or checkpointing as a function of elapsed time.
The replication module is getting pretty hairy again; in my opinion
its probably time to refactor out the attachment stuff into its own
module. I may get around to that tomorrow if no one objects.
Best, Adam
On May 16, 2009, at 2:58 PM, kocolosk@apache.org wrote:
> Author: kocolosk
> Date: Sat May 16 18:58:18 2009
> New Revision: 775507
>
> URL: http://svn.apache.org/viewvc?rev=775507&view=rev
> Log:
> replicator memory management and buffer flush calculation updates
>
> * new should_flush fun considers ndocs, nattachments, memory in
> making decision
> * memory utilized by attachment receivers is accounted for
> * download attachments using standalone connections instead of conn
> pool. This
> prevents a document request from getting stuck behind a huge
> attachment, which
> would prevent us from triggering a buffer flush in time. We also
> consider the
> memory utilization of the standalone ibrowse connection in
> should_flush
>
> Modified:
> couchdb/trunk/src/couchdb/couch_rep.erl
>
> Modified: couchdb/trunk/src/couchdb/couch_rep.erl
> URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep.erl?rev=775507&r1=775506&r2=775507&view=diff
> =
> =
> =
> =
> =
> =
> =
> =
> ======================================================================
> --- couchdb/trunk/src/couchdb/couch_rep.erl (original)
> +++ couchdb/trunk/src/couchdb/couch_rep.erl Sat May 16 18:58:18 2009
> @@ -17,7 +17,12 @@
>
> -export([replicate/2]).
>
> +-define(BUFFER_NDOCS, 1000).
> +-define(BUFFER_NATTACHMENTS, 50).
> +-define(BUFFER_MEMORY, 10000000). %% bytes
> +
> -include("couch_db.hrl").
> +-include("../ibrowse/ibrowse.hrl").
>
> %% @spec replicate(Source::binary(), Target::binary()) ->
> %% {ok, Stats} | {error, Reason}
> @@ -202,7 +207,8 @@
> ets:update_counter(Stats, docs_read, length(Docs)),
>
> %% save them (maybe in a buffer)
> - {NewBuffer, NewContext} = case couch_util:should_flush() of
> + {NewBuffer, NewContext} =
> + case should_flush(lists:flatlength([Docs|Buffer])) of
> true ->
> Docs2 = lists:flatten([Docs|Buffer]),
> {ok, Errors} = update_docs(Target, Docs2, [],
> replicated_changes),
> @@ -222,7 +228,7 @@
> ets:update_counter(State#state.stats, total_revs, RevsCount),
> case State#state.listeners of
> [] ->
> - % still waiting for the first listener to sen a request
> + % still waiting for the first listener to send a request
> {noreply, State#state{current_seq=LastSeq,done=true}};
> _ ->
> {stop, normal, ok, State#state{current_seq=LastSeq}}
> @@ -327,13 +333,13 @@
> [Id, couch_doc:rev_to_str(Rev), Error]),
> dump_update_errors(Rest).
>
> -attachment_loop(ReqId) ->
> +attachment_loop(ReqId, Conn) ->
> couch_util:should_flush(),
> receive
> {From, {set_req_id, NewId}} ->
> %% we learn the ReqId to listen for
> From ! {self(), {ok, NewId}},
> - attachment_loop(NewId);
> + attachment_loop(NewId, Conn);
> {ibrowse_async_headers, ReqId, Status, Headers} ->
> %% we got header, give the controlling process a chance
> to react
> receive
> @@ -343,37 +349,42 @@
> receive
> {From, continue} ->
> %% normal case
> - attachment_loop(ReqId);
> + attachment_loop(ReqId, Conn);
> {From, fail} ->
> %% error, failure code
> ?LOG_ERROR(
> "streaming attachment failed with
> status ~p",
> [Status]),
> + catch ibrowse:stop_worker_process(Conn),
> exit(attachment_request_failed);
> {From, stop_ok} ->
> %% stop looping, controller will start a
> new loop
> + catch ibrowse:stop_worker_process(Conn),
> stop_ok
> end
> end,
> - attachment_loop(ReqId);
> + attachment_loop(ReqId, Conn);
> {ibrowse_async_response, ReqId, {chunk_start,_}} ->
> - attachment_loop(ReqId);
> + attachment_loop(ReqId, Conn);
> {ibrowse_async_response, ReqId, chunk_end} ->
> - attachment_loop(ReqId);
> + attachment_loop(ReqId, Conn);
> {ibrowse_async_response, ReqId, {error, Err}} ->
> ?LOG_ERROR("streaming attachment failed with ~p", [Err]),
> + catch ibrowse:stop_worker_process(Conn),
> exit(attachment_request_failed);
> {ibrowse_async_response, ReqId, Data} ->
> receive {From, gimme_data} -> From ! {self(), Data} end,
> - attachment_loop(ReqId);
> - {ibrowse_async_response_end, ReqId} -> ok
> + attachment_loop(ReqId, Conn);
> + {ibrowse_async_response_end, ReqId} ->
> + catch ibrowse:stop_worker_process(Conn),
> + exit(normal)
> end.
>
> attachment_stub_converter(DbS, Id, Rev, {Name, {stub, Type,
> Length}}) ->
> #http_db{uri=DbUrl, headers=Headers} = DbS,
> {Pos, [RevId|_]} = Rev,
> Url = lists:flatten([DbUrl, url_encode(Id), "/", url_encode(?
> b2l(Name)),
> - "?rev=", couch_doc:rev_to_str({Pos,RevId})]),
> + "?rev=", ?b2l(couch_doc:rev_to_str({Pos,RevId}))]),
> ?LOG_DEBUG("Attachment URL ~p", [Url]),
> {ok, RcvFun} = make_attachment_stub_receiver(Url, Headers, Name,
> Type, Length),
> @@ -389,11 +400,14 @@
>
> make_attachment_stub_receiver(Url, Headers, Name, Type, Length,
> Retries) ->
> %% start the process that receives attachment data from ibrowse
> - Pid = spawn_link(fun() -> attachment_loop(nil) end),
> + #url{host=Host, port=Port} = ibrowse_lib:parse_url(Url),
> + {ok, Conn} = ibrowse:spawn_link_worker_process(Host, Port),
> + Pid = spawn_link(fun() -> attachment_loop(nil, Conn) end),
>
> %% make the async request
> - Options = [{stream_to, Pid}, {response_format, binary}],
> - ReqId = case ibrowse:send_req(Url, Headers, get, [], Options,
> infinity) of
> + Opts = [{stream_to, Pid}, {response_format, binary}],
> + ReqId =
> + case ibrowse:send_req_direct(Conn, Url, Headers, get, [], Opts,
> infinity) of
> {ibrowse_req_id, X} -> X;
> {error, _Reason} -> exit(attachment_request_failed)
> end,
> @@ -717,6 +731,46 @@
> open_doc_revs(Db, DocId, Revs, Options) ->
> couch_db:open_doc_revs(Db, DocId, Revs, Options).
>
> +%% @spec should_flush() -> true | false
> +%% @doc Calculates whether it's time to flush the document buffer.
> Considers
> +%% - memory utilization
> +%% - number of pending document writes
> +%% - approximate number of pending attachment writes
> +should_flush(DocCount) when DocCount > ?BUFFER_NDOCS ->
> + true;
> +should_flush(_DocCount) ->
> + MeAndMyLinks = [self()|element(2,process_info(self(),links))],
> +
> + case length(MeAndMyLinks)/2 > ?BUFFER_NATTACHMENTS of
> + true -> true;
> + false ->
> + case memory_footprint(MeAndMyLinks) > 2*?BUFFER_MEMORY of
> + true ->
> + [garbage_collect(Pid) || Pid <- MeAndMyLinks],
> + memory_footprint(MeAndMyLinks) > ?BUFFER_MEMORY;
> + false -> false
> + end
> + end.
> +
> +%% @spec memory_footprint([pid()]) -> integer()
> +%% @doc Sum of process and binary memory utilization for all
> processes in list
> +memory_footprint(PidList) ->
> + ProcessMemory = lists:foldl(fun(Pid, Acc) ->
> + Acc + element(2,process_info(Pid, memory))
> + end, 0, PidList),
> +
> + BinaryMemory = lists:foldl(fun(Pid, Acc) ->
> + Acc + binary_memory(Pid)
> + end, 0, PidList),
> +
> + ?LOG_DEBUG("ProcessMem ~p BinaryMem ~p", [ProcessMemory,
> BinaryMemory]),
> + ProcessMemory + BinaryMemory.
> +
> +%% @spec binary_memory(pid()) -> integer()
> +%% @doc Memory utilization of all binaries referenced by this
> process.
> +binary_memory(Pid) ->
> + lists:foldl(fun({_Id, Size, _NRefs}, Acc) -> Size+Acc end,
> + 0, element(2,process_info(Pid, binary))).
>
> update_doc(#http_db{uri=DbUrl, headers=Headers}, #doc{id=DocId}=Doc,
> Options) ->
> [] = Options,
>
>
Re: svn commit: r775507 - /couchdb/trunk/src/couchdb/couch_rep.erl
Posted by Adam Kocoloski <ko...@apache.org>.
On May 16, 2009, at 8:44 PM, Antony Blakey wrote:
> On 17/05/2009, at 4:20 AM, Adam Kocoloski wrote:
>
>> Ok, so here's a start at reworking some of the memory management
>> and buffering calculations. It fixes the regression where
>> attachment memory wasn't being included in the memory utilization
>> numbers, and it also includes ibrowse memory utilization for
>> attachments (which is larger than Couch's).
>>
>> The decision to flush the buffer (to disk or to the remote target
>> server) is dependent on the number of docs in the buffer, the
>> approximate number of attachments, and the memory utilization. I
>> estimate the number of attachments as 0.5*nlinks, since every
>> attachment download spawns two processes: one dedicated ibrowse
>> worker and the attachment receiver. The dedicated ibrowse workers
>> get the attachments out of the connection pool and let us keep a
>> better eye on their memory usage.
>>
>> Each of the thresholds is currently just defined as a macro at the
>> top of the module. I haven't done any work on adjusting these
>> thresholds dynamically or checkpointing as a function of elapsed
>> time.
>>
>> The replication module is getting pretty hairy again; in my opinion
>> its probably time to refactor out the attachment stuff into its own
>> module. I may get around to that tomorrow if no one objects.
>
> What do you think about adding binary backoff to help with
> unreliable links? Even if attachments are buffered to disk there's
> still the issue of making checkpoint progress in the face of link
> failure. Or maybe checkpoint the buffer on any failure (although
> that won't help the situation where couchdb quits).
We already try to checkpoint in the event of failure, but it doesn't
really help much because the checkpoint record has to be saved on both
source and target in order to be recognized the next time around.
I'm definitely in favor of adding code to help the replicator make
progress in the face of link failure, whether that be some sort of
backoff algorithm, caching and reusing partial attachment downloads,
etc.
Best, Adam
Re: svn commit: r775507 - /couchdb/trunk/src/couchdb/couch_rep.erl
Posted by Antony Blakey <an...@gmail.com>.
On 17/05/2009, at 4:20 AM, Adam Kocoloski wrote:
> Ok, so here's a start at reworking some of the memory management and
> buffering calculations. It fixes the regression where attachment
> memory wasn't being included in the memory utilization numbers, and
> it also includes ibrowse memory utilization for attachments (which
> is larger than Couch's).
>
> The decision to flush the buffer (to disk or to the remote target
> server) is dependent on the number of docs in the buffer, the
> approximate number of attachments, and the memory utilization. I
> estimate the number of attachments as 0.5*nlinks, since every
> attachment download spawns two processes: one dedicated ibrowse
> worker and the attachment receiver. The dedicated ibrowse workers
> get the attachments out of the connection pool and let us keep a
> better eye on their memory usage.
>
> Each of the thresholds is currently just defined as a macro at the
> top of the module. I haven't done any work on adjusting these
> thresholds dynamically or checkpointing as a function of elapsed time.
>
> The replication module is getting pretty hairy again; in my opinion
> its probably time to refactor out the attachment stuff into its own
> module. I may get around to that tomorrow if no one objects.
What do you think about adding binary backoff to help with unreliable
links? Even if attachments are buffered to disk there's still the
issue of making checkpoint progress in the face of link failure. Or
maybe checkpoint the buffer on any failure (although that won't help
the situation where couchdb quits).
Antony Blakey
--------------------------
CTO, Linkuistics Pty Ltd
Ph: 0438 840 787
Isn't it enough to see that a garden is beautiful without having to
believe that there are fairies at the bottom of it too?
-- Douglas Adams