You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@couchdb.apache.org by Benoit Chesneau <bc...@gmail.com> on 2011/01/25 18:31:11 UTC

Re: svn commit: r1062772 - /couchdb/trunk/src/couchdb/couch_rep_db_listener.erl

On Mon, Jan 24, 2011 at 2:46 PM,  <fd...@apache.org> wrote:
> Author: fdmanana
> Date: Mon Jan 24 13:46:11 2011
> New Revision: 1062772
>
> URL: http://svn.apache.org/viewvc?rev=1062772&view=rev
> Log:
> Refactoring of the replicator database listener
>
> Simpler implementation and more reliable behaviour when the replicator
> database is deleted or changed on the fly.
>
> Modified:
>    couchdb/trunk/src/couchdb/couch_rep_db_listener.erl
>
> Modified: couchdb/trunk/src/couchdb/couch_rep_db_listener.erl
> URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_rep_db_listener.erl?rev=1062772&r1=1062771&r2=1062772&view=diff
> ==============================================================================
> --- couchdb/trunk/src/couchdb/couch_rep_db_listener.erl (original)
> +++ couchdb/trunk/src/couchdb/couch_rep_db_listener.erl Mon Jan 24 13:46:11 2011
> @@ -18,98 +18,113 @@
>
>  -include("couch_db.hrl").
>
> --define(DOC_TO_REP_ID_MAP, rep_doc_id_to_rep_id).
> --define(REP_ID_TO_DOC_ID_MAP, rep_id_to_rep_doc_id).
> +-define(DOC_ID_TO_REP_ID, rep_doc_id_to_rep_id).
> +-define(REP_ID_TO_DOC_ID, rep_id_to_rep_doc_id).
>
>  -record(state, {
>     changes_feed_loop = nil,
> -    changes_queue = nil,
> -    changes_processor = nil,
> -    db_notifier = nil
> +    db_notifier = nil,
> +    rep_db_name = nil,
> +    rep_start_pids = []
>  }).
>
> +-import(couch_util, [
> +    get_value/2,
> +    get_value/3
> +]).
> +
>
>  start_link() ->
>     gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
>
>  init(_) ->
>     process_flag(trap_exit, true),
> -    {ok, Queue} = couch_work_queue:new(
> -        [{max_size, 1024 * 1024}, {max_items, 1000}]),
> -    {ok, Processor} = changes_processor(Queue),
> -    {ok, Loop} = changes_feed_loop(Queue),
> +    ?DOC_ID_TO_REP_ID = ets:new(?DOC_ID_TO_REP_ID, [named_table, set, private]),
> +    ?REP_ID_TO_DOC_ID = ets:new(?REP_ID_TO_DOC_ID, [named_table, set, private]),
>     Server = self(),
>     ok = couch_config:register(
> -        fun("replicator", "db") ->
> -            ok = gen_server:cast(Server, rep_db_changed)
> +        fun("replicator", "db", NewName) ->
> +            ok = gen_server:cast(Server, {rep_db_changed, ?l2b(NewName)})
>         end
>     ),
> +    {Loop, RepDbName} = changes_feed_loop(),
>     {ok, #state{
>         changes_feed_loop = Loop,
> -        changes_queue = Queue,
> -        changes_processor = Processor,
> +        rep_db_name = RepDbName,
>         db_notifier = db_update_notifier()}
>     }.
>
>
> +handle_call({rep_db_update, Change}, _From, State) ->
> +    {reply, ok, process_update(State, Change)};
> +
>  handle_call(Msg, From, State) ->
>     ?LOG_ERROR("Replicator DB listener received unexpected call ~p from ~p",
>         [Msg, From]),
>     {stop, {error, {unexpected_call, Msg}}, State}.
>
>
> -handle_cast(rep_db_changed, State) ->
> -    #state{
> -        changes_feed_loop = Loop,
> -        changes_queue = Queue
> -    } = State,
> -    catch unlink(Loop),
> -    catch exit(Loop, rep_db_changed),
> -    couch_work_queue:queue(Queue, stop_all_replications),
> -    {ok, NewLoop} = changes_feed_loop(Queue),
> -    {noreply, State#state{changes_feed_loop = NewLoop}};
> -
> -handle_cast(rep_db_created, #state{changes_feed_loop = Loop} = State) ->
> -    catch unlink(Loop),
> -    catch exit(Loop, rep_db_changed),
> -    {ok, NewLoop} = changes_feed_loop(State#state.changes_queue),
> -    {noreply, State#state{changes_feed_loop = NewLoop}};
> +handle_cast({rep_db_changed, NewName},
> +        #state{rep_db_name = NewName} = State) ->
> +    {noreply, State};
> +
> +handle_cast({rep_db_changed, _NewName}, State) ->
> +    {noreply, restart(State)};
> +
> +handle_cast({rep_db_created, NewName},
> +        #state{rep_db_name = NewName} = State) ->
> +    {noreply, State};
> +
> +handle_cast({rep_db_created, _NewName}, State) ->
> +    {noreply, restart(State)};
>
>  handle_cast(Msg, State) ->
>     ?LOG_ERROR("Replicator DB listener received unexpected cast ~p", [Msg]),
>     {stop, {error, {unexpected_cast, Msg}}, State}.
>
> +
>  handle_info({'EXIT', From, normal}, #state{changes_feed_loop = From} = State) ->
>     % replicator DB deleted
> -    couch_work_queue:queue(State#state.changes_queue, stop_all_replications),
> -    {noreply, State#state{changes_feed_loop = nil}};
> +    {noreply, State#state{changes_feed_loop = nil, rep_db_name = nil}};
>
>  handle_info({'EXIT', From, Reason}, #state{db_notifier = From} = State) ->
>     ?LOG_ERROR("Database update notifier died. Reason: ~p", [Reason]),
>     {stop, {db_update_notifier_died, Reason}, State};
>
> -handle_info({'EXIT', From, Reason}, #state{changes_processor = From} = State) ->
> -    ?LOG_ERROR("Replicator DB changes processor died. Reason: ~p", [Reason]),
> -    {stop, {rep_db_changes_processor_died, Reason}, State}.
> +handle_info({'EXIT', From, normal}, #state{rep_start_pids = Pids} = State) ->
> +    % one of the replication start processes terminated successfully
> +    {noreply, State#state{rep_start_pids = Pids -- [From]}};
> +
> +handle_info(Msg, State) ->
> +    ?LOG_ERROR("Replicator DB listener received unexpected message ~p", [Msg]),
> +    {stop, {unexpected_msg, Msg}, State}.
>
>
>  terminate(_Reason, State) ->
>     #state{
> +        rep_start_pids = StartPids,
>         changes_feed_loop = Loop,
> -        changes_queue = Queue
> +        db_notifier = Notifier
>     } = State,
> -    exit(Loop, stop),
> -    % closing the queue will cause changes_processor to shutdown
> -    couch_work_queue:close(Queue),
> -    ok.
> +    stop_all_replications(),
> +    lists:foreach(
> +        fun(Pid) ->
> +            catch unlink(Pid),
> +            catch exit(Pid, stop)
> +        end,
> +        [Loop | StartPids]),
> +    true = ets:delete(?REP_ID_TO_DOC_ID),
> +    true = ets:delete(?DOC_ID_TO_REP_ID),
> +    couch_db_update_notifier:stop(Notifier).
>
>
>  code_change(_OldVsn, State, _Extra) ->
>     {ok, State}.
>
>
> -changes_feed_loop(ChangesQueue) ->
> +changes_feed_loop() ->
>     {ok, RepDb} = couch_rep:ensure_rep_db_exists(),
> +    Server = self(),
>     Pid = spawn_link(
>         fun() ->
>             ChangesFeedFun = couch_changes:handle_changes(
> @@ -126,7 +141,8 @@ changes_feed_loop(ChangesQueue) ->
>                 fun({change, Change, _}, _) ->
>                     case has_valid_rep_id(Change) of
>                     true ->
> -                        couch_work_queue:queue(ChangesQueue, Change);
> +                        ok = gen_server:call(
> +                            Server, {rep_db_update, Change}, infinity);
>                     false ->
>                         ok
>                     end;
> @@ -137,7 +153,15 @@ changes_feed_loop(ChangesQueue) ->
>         end
>     ),
>     couch_db:close(RepDb),
> -    {ok, Pid}.
> +    {Pid, couch_db:name(RepDb)}.
> +
> +
> +has_valid_rep_id({Change}) ->
> +    has_valid_rep_id(get_value(<<"id">>, Change));
> +has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
> +    false;
> +has_valid_rep_id(_Else) ->
> +    true.
>
>
>  db_update_notifier() ->
> @@ -146,121 +170,106 @@ db_update_notifier() ->
>         fun({created, DbName}) ->
>             case ?l2b(couch_config:get("replicator", "db", "_replicator")) of
>             DbName ->
> -                ok = gen_server:cast(Server, rep_db_created);
> +                ok = gen_server:cast(Server, {rep_db_created, DbName});
>             _ ->
>                 ok
>             end;
>         (_) ->
> +            % no need to handle the 'deleted' event - the changes feed loop
> +            % dies when the database is deleted
>             ok
>         end
>     ),
>     Notifier.
>
>
> -changes_processor(ChangesQueue) ->
> -    Pid = spawn_link(
> -        fun() ->
> -            ets:new(?DOC_TO_REP_ID_MAP, [named_table, set, private]),
> -            ets:new(?REP_ID_TO_DOC_ID_MAP, [named_table, set, private]),
> -            consume_changes(ChangesQueue),
> -            true = ets:delete(?REP_ID_TO_DOC_ID_MAP),
> -            true = ets:delete(?DOC_TO_REP_ID_MAP)
> -        end
> -    ),
> -    {ok, Pid}.
> -
> -
> -consume_changes(ChangesQueue) ->
> -    case couch_work_queue:dequeue(ChangesQueue) of
> -    closed ->
> -        ok;
> -    {ok, Changes} ->
> -        lists:foreach(fun process_change/1, Changes),
> -        consume_changes(ChangesQueue)
> -    end.
> -
> -
> -has_valid_rep_id({Change}) ->
> -    has_valid_rep_id(couch_util:get_value(<<"id">>, Change));
> -has_valid_rep_id(<<?DESIGN_DOC_PREFIX, _Rest/binary>>) ->
> -    false;
> -has_valid_rep_id(_Else) ->
> -    true.
> +restart(#state{changes_feed_loop = Loop, rep_start_pids = StartPids} = State) ->
> +    stop_all_replications(),
> +    lists:foreach(
> +        fun(Pid) ->
> +            catch unlink(Pid),
> +            catch exit(Pid, rep_db_changed)
> +        end,
> +        [Loop | StartPids]),
> +    {NewLoop, NewRepDbName} = changes_feed_loop(),
> +    State#state{
> +        changes_feed_loop = NewLoop,
> +        rep_db_name = NewRepDbName,
> +        rep_start_pids = []
> +    }.
>
> -process_change(stop_all_replications) ->
> -    ?LOG_INFO("Stopping all ongoing replications because the replicator DB "
> -        "was deleted or changed", []),
> -    stop_all_replications();
>
> -process_change({Change}) ->
> -    {RepProps} = JsonRepDoc = couch_util:get_value(doc, Change),
> -    DocId = couch_util:get_value(<<"_id">>, RepProps),
> -    case couch_util:get_value(<<"deleted">>, Change, false) of
> +process_update(State, {Change}) ->
> +    {RepProps} = JsonRepDoc = get_value(doc, Change),
> +    DocId = get_value(<<"_id">>, RepProps),
> +    case get_value(<<"deleted">>, Change, false) of
>     true ->
> -        rep_doc_deleted(DocId);
> +        rep_doc_deleted(DocId),
> +        State;
>     false ->
> -        case couch_util:get_value(<<"_replication_state">>, RepProps) of
> +        case get_value(<<"_replication_state">>, RepProps) of
>         <<"completed">> ->
> -            replication_complete(DocId);
> +            replication_complete(DocId),
> +            State;
>         <<"error">> ->
> -            stop_replication(DocId);
> +            stop_replication(DocId),
> +            State;
>         <<"triggered">> ->
> -            maybe_start_replication(DocId, JsonRepDoc);
> +            maybe_start_replication(State, DocId, JsonRepDoc);
>         undefined ->
> -            maybe_start_replication(DocId, JsonRepDoc);
> -        _ ->
> -            ?LOG_ERROR("Invalid value for the `_replication_state` property"
> -                " of the replication document `~s`", [DocId])
> +            maybe_start_replication(State, DocId, JsonRepDoc)
>         end
> -    end,
> -    ok.
> +    end.
>
>
>  rep_user_ctx({RepDoc}) ->
> -    case couch_util:get_value(<<"user_ctx">>, RepDoc) of
> +    case get_value(<<"user_ctx">>, RepDoc) of
>     undefined ->
>         #user_ctx{roles = [<<"_admin">>]};
>     {UserCtx} ->
>         #user_ctx{
> -            name = couch_util:get_value(<<"name">>, UserCtx, null),
> -            roles = couch_util:get_value(<<"roles">>, UserCtx, [])
> +            name = get_value(<<"name">>, UserCtx, null),
> +            roles = get_value(<<"roles">>, UserCtx, [])
>         }
>     end.
>
>
> -maybe_start_replication(DocId, JsonRepDoc) ->
> +maybe_start_replication(State, DocId, JsonRepDoc) ->
>     UserCtx = rep_user_ctx(JsonRepDoc),
>     {BaseId, _} = RepId = couch_rep:make_replication_id(JsonRepDoc, UserCtx),
> -    case ets:lookup(?REP_ID_TO_DOC_ID_MAP, BaseId) of
> +    case ets:lookup(?REP_ID_TO_DOC_ID, BaseId) of
>     [] ->
> -        true = ets:insert(?REP_ID_TO_DOC_ID_MAP, {BaseId, DocId}),
> -        true = ets:insert(?DOC_TO_REP_ID_MAP, {DocId, RepId}),
> -        spawn_link(fun() -> start_replication(JsonRepDoc, RepId, UserCtx) end);
> +        true = ets:insert(?REP_ID_TO_DOC_ID, {BaseId, DocId}),
> +        true = ets:insert(?DOC_ID_TO_REP_ID, {DocId, RepId}),
> +        Pid = spawn_link(fun() ->
> +            start_replication(JsonRepDoc, RepId, UserCtx)
> +        end),
> +        State#state{rep_start_pids = [Pid | State#state.rep_start_pids]};
>     [{BaseId, DocId}] ->
> -        ok;
> +        State;
>     [{BaseId, OtherDocId}] ->
> -        maybe_tag_rep_doc(DocId, JsonRepDoc, ?l2b(BaseId), OtherDocId)
> +        ?LOG_INFO("The replication specified by the document `~s` was already"
> +            " triggered by the document `~s`", [DocId, OtherDocId]),
> +        maybe_tag_rep_doc(JsonRepDoc, ?l2b(BaseId)),
> +        State
>     end.
>
>
> -maybe_tag_rep_doc(DocId, {Props} = JsonRepDoc, RepId, OtherDocId) ->
> -    case couch_util:get_value(<<"_replication_id">>, Props) of
> +maybe_tag_rep_doc({Props} = JsonRepDoc, RepId) ->
> +    case get_value(<<"_replication_id">>, Props) of
>     RepId ->
>         ok;
>     _ ->
> -        ?LOG_INFO("The replication specified by the document `~s` was already"
> -            " triggered by the document `~s`", [DocId, OtherDocId]),
>         couch_rep:update_rep_doc(JsonRepDoc, [{<<"_replication_id">>, RepId}])
>     end.
>
>
> -
> -start_replication({RepProps} = RepDoc, {Base, Ext} = RepId, UserCtx) ->
> +start_replication({RepProps} = RepDoc, {Base, _} = RepId, UserCtx) ->
>     case (catch couch_rep:start_replication(RepDoc, RepId, UserCtx)) of
> -    RepPid when is_pid(RepPid) ->
> +    Pid when is_pid(Pid) ->
>         ?LOG_INFO("Document `~s` triggered replication `~s`",
> -            [couch_util:get_value(<<"_id">>, RepProps), Base ++ Ext]),
> -        couch_rep:get_result(RepPid, RepId, RepDoc, UserCtx);
> +            [get_value(<<"_id">>, RepProps), pp_rep_id(RepId)]),
> +        couch_rep:get_result(Pid, RepId, RepDoc, UserCtx);
>     Error ->
>         couch_rep:update_rep_doc(
>             RepDoc,
> @@ -269,43 +278,54 @@ start_replication({RepProps} = RepDoc, {
>                 {<<"_replication_id">>, ?l2b(Base)}
>             ]
>         ),
> -        ?LOG_ERROR("Error starting replication `~s`: ~p", [Base ++ Ext, Error])
> +        ?LOG_ERROR("Error starting replication `~s`: ~p", [pp_rep_id(RepId), Error])
>     end.
>
> +
>  rep_doc_deleted(DocId) ->
>     case stop_replication(DocId) of
> -    {ok, {Base, Ext}} ->
> +    {ok, RepId} ->
>         ?LOG_INFO("Stopped replication `~s` because replication document `~s`"
> -            " was deleted", [Base ++ Ext, DocId]);
> +            " was deleted", [pp_rep_id(RepId), DocId]);
>     none ->
>         ok
>     end.
>
> +
>  replication_complete(DocId) ->
>     case stop_replication(DocId) of
> -    {ok, {Base, Ext}} ->
> +    {ok, RepId} ->
>         ?LOG_INFO("Replication `~s` finished (triggered by document `~s`)",
> -            [Base ++ Ext, DocId]);
> +            [pp_rep_id(RepId), DocId]);
>     none ->
>         ok
>     end.
>
> +
>  stop_replication(DocId) ->
> -    case ets:lookup(?DOC_TO_REP_ID_MAP, DocId) of
> +    case ets:lookup(?DOC_ID_TO_REP_ID, DocId) of
>     [{DocId, {BaseId, _} = RepId}] ->
>         couch_rep:end_replication(RepId),
> -        true = ets:delete(?REP_ID_TO_DOC_ID_MAP, BaseId),
> -        true = ets:delete(?DOC_TO_REP_ID_MAP, DocId),
> +        true = ets:delete(?REP_ID_TO_DOC_ID, BaseId),
> +        true = ets:delete(?DOC_ID_TO_REP_ID, DocId),
>         {ok, RepId};
>     [] ->
>         none
>     end.
>
> +
>  stop_all_replications() ->
> +    ?LOG_INFO("Stopping all ongoing replications because the replicator DB "
> +        "was deleted or changed", []),
>     ets:foldl(
>         fun({_, RepId}, _) -> couch_rep:end_replication(RepId) end,
>         ok,
> -        ?DOC_TO_REP_ID_MAP
> +        ?DOC_ID_TO_REP_ID
>     ),
> -    true = ets:delete_all_objects(?REP_ID_TO_DOC_ID_MAP),
> -    true = ets:delete_all_objects(?DOC_TO_REP_ID_MAP).
> +    true = ets:delete_all_objects(?REP_ID_TO_DOC_ID),
> +    true = ets:delete_all_objects(?DOC_ID_TO_REP_ID).
> +
> +
> +% pretty-print replication id
> +pp_rep_id({Base, Extension}) ->
> +    Base ++ Extension.
>
>
>

Is there any reason you are using named table here ? Why not just use
ets ids ? Also why using macros ?

Re: svn commit: r1062772 - /couchdb/trunk/src/couchdb/couch_rep_db_listener.erl

Posted by Benoit Chesneau <bc...@gmail.com>.
On Tue, Jan 25, 2011 at 6:41 PM, Filipe David Manana
<fd...@apache.org> wrote:
> On Tue, Jan 25, 2011 at 5:31 PM, Benoit Chesneau <bc...@gmail.com> wrote:
>>
>> Is there any reason you are using named table here ? Why not just use
>> ets ids ? Also why using macros ?
>>
>
> Yes. Not using the IDs returned by ets:new/2 because that means saving
> them in the state and having to pass them to every helper function.
> Like this, it makes the code shorter by not passing so many parameters
> to each function.
> Macros are used because: if you need to change the value (ets table
> name), you'll only do it in one place.
>
>
well using macros for that is a little overkill imo :) Or rather if
you use named tables that because you don't want to get the table by
their identifiers. Dunno.

- benoît

Re: svn commit: r1062772 - /couchdb/trunk/src/couchdb/couch_rep_db_listener.erl

Posted by Filipe David Manana <fd...@apache.org>.
On Tue, Jan 25, 2011 at 5:31 PM, Benoit Chesneau <bc...@gmail.com> wrote:
>
> Is there any reason you are using named table here ? Why not just use
> ets ids ? Also why using macros ?
>

Yes. Not using the IDs returned by ets:new/2 because that means saving
them in the state and having to pass them to every helper function.
Like this, it makes the code shorter by not passing so many parameters
to each function.
Macros are used because: if you need to change the value (ets table
name), you'll only do it in one place.



-- 
Filipe David Manana,
fdmanana@gmail.com, fdmanana@apache.org

"Reasonable men adapt themselves to the world.
 Unreasonable men adapt the world to themselves.
 That's why all progress depends on unreasonable men."