You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/08/03 19:16:05 UTC

svn commit: r981973 - in /couchdb/branches/new_replicator/src/couchdb: Makefile.am couch_api_wrap.erl couch_httpd_rep.erl couch_replicate.erl couch_server_sup.erl

Author: fdmanana
Date: Tue Aug  3 17:16:04 2010
New Revision: 981973

URL: http://svn.apache.org/viewvc?rev=981973&view=rev
Log:
Moved the new replicator code into an OTP gen_server (and under a supervision tree).
Fully working but needs to be more polished.


Modified:
    couchdb/branches/new_replicator/src/couchdb/Makefile.am
    couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
    couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
    couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
    couchdb/branches/new_replicator/src/couchdb/couch_server_sup.erl

Modified: couchdb/branches/new_replicator/src/couchdb/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/Makefile.am?rev=981973&r1=981972&r2=981973&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/Makefile.am (original)
+++ couchdb/branches/new_replicator/src/couchdb/Makefile.am Tue Aug  3 17:16:04 2010
@@ -80,6 +80,7 @@ source_files = \
     couch_db_updater.erl \
     couch_work_queue.erl \
     couch_replicate.erl \
+    couch_replication_notifier.erl \
     couch_httpd_rep.erl \
     couch_api_wrap.erl \
     json_stream_parse.erl
@@ -142,6 +143,7 @@ compiled_files = \
     couch_db_updater.beam \
     couch_work_queue.beam \
     couch_replicate.beam \
+    couch_replication_notifier.beam \
     couch_httpd_rep.beam \
     couch_api_wrap.beam \
     json_stream_parse.beam

Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=981973&r1=981972&r2=981973&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Tue Aug  3 17:16:04 2010
@@ -52,15 +52,20 @@ db_open(Db, Options) ->
 
 db_open(#httpdb{} = Db, _Options, Create) ->
     #httpdb{url=Url, oauth=OAuth, headers=Headers} = Db,
+    Headers2 = oauth_header(Url, [], put, OAuth) ++ Headers,
     case Create of
     false ->
         ok;
     true ->
-        Headers2 = oauth_header(Url, [], put, OAuth) ++ Headers,
-        catch ibrowse:send_req(Url, Headers2, put, [],
-            [{response_format, binary}], infinity)
+        catch ibrowse:send_req(Url, Headers2, put)
     end,
-    {ok, Db};
+    case (catch ibrowse:send_req(Url, Headers2, head)) of
+    {ok, "200", _, _} ->
+        {ok, Db};
+    {ok, _Code, _, _} ->
+        % TODO deal with HTTP redirects
+        throw({db_not_found, ?l2b(Url)})
+    end;
 db_open(DbName, Options, Create) ->
     case Create of
     false ->
@@ -73,7 +78,12 @@ db_open(DbName, Options, Create) ->
             ok
         end
     end,
-    couch_db:open(DbName,Options).
+    case couch_db:open(DbName, Options) of
+    {not_found, _Reason} ->
+        throw({db_not_found, DbName});
+    {ok, _Db2} = Success ->
+        Success
+    end.
 
 db_close(#httpdb{}) ->
     ok;

Modified: couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl?rev=981973&r1=981972&r2=981973&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl Tue Aug  3 17:16:04 2010
@@ -33,7 +33,11 @@ handle_req(#httpd{method='POST'} = Req) 
     SrcDb = parse_rep_db(couch_util:get_value(<<"source">>, PostBody)),
     TgtDb = parse_rep_db(couch_util:get_value(<<"target">>, PostBody)),
     Options = convert_options(PostBody),
-    try couch_replicate:start(SrcDb, TgtDb, Options, Req#httpd.user_ctx) of
+    try couch_replicate:replicate(SrcDb, TgtDb, Options, Req#httpd.user_ctx) of
+    {error, Reason} ->
+        send_json(Req, 500, {[{error, Reason}]});
+    {ok, {cancelled, RepId}} ->
+        send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
     {ok, {HistoryResults}} ->
         send_json(Req, {[{ok, true} | HistoryResults]})
     catch

Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=981973&r1=981972&r2=981973&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Tue Aug  3 17:16:04 2010
@@ -11,14 +11,30 @@
 % the License.
 
 -module(couch_replicate).
+-behaviour(gen_server).
 
--export([start/4]).
+% public API
+-export([replicate/4]).
+
+% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
 
 -include("couch_db.hrl").
 -include("couch_api_wrap.hrl").
 
 
+-record(stats, {
+    missing_checked = 0,
+    missing_found = 0,
+    docs_read = 0,
+    docs_written = 0,
+    doc_write_failures = 0
+    }).
+
 -record(rep_state, {
+    rep_id,
+    rep_options,
     source_name,
     target_name,
     source,
@@ -33,25 +49,118 @@
     rep_starttime,
     src_starttime,
     tgt_starttime,
-    timer % checkpoint timer
+    timer, % checkpoint timer
+    missing_revs_queue,
+    changes_queue,
+    changes_reader,
+    missing_revs_finder,
+    doc_copier,
+    seqs_in_progress = gb_trees:from_orddict([]),
+    stats = #stats{}
     }).
 
--record(stats, {
-    missing_checked=0,
-    missing_found=0,
-    docs_read=0,
-    docs_written=0,
-    doc_write_failures=0
-    }).
-        
-start(Src, Tgt, Options, UserCtx) ->
-    
-    _Continuous = couch_util:get_value(continuous, Options, false),
-    
-    % initalize the replication state, looking for existing rep records
-    % for incremental replication.
-    #rep_state{source=Source, target=Target, start_seq=StartSeq} = State =
-            init_state(Src, Tgt, Options, UserCtx), 
+
+replicate(Src, Tgt, Options, UserCtx) ->
+    RepId = make_replication_id(Src, Tgt, UserCtx, Options),
+    case couch_util:get_value(cancel, Options, false) of
+    true ->
+        end_replication(RepId);
+    false ->
+        {ok, Listener} = rep_result_listener(RepId),
+        {ok, _Pid} = start_replication(RepId, Src, Tgt, Options, UserCtx),
+        wait_for_result(RepId, Listener)
+    end.
+
+
+start_replication({BaseId, Extension} = RepId, Src, Tgt, Options, UserCtx) ->
+    RepChildId = BaseId ++ Extension,
+    ChildSpec = {
+        RepChildId,
+        {gen_server, start_link,
+           [?MODULE, [RepId, Src, Tgt, Options, UserCtx], []]},
+        transient,
+        1,
+        worker,
+        [?MODULE]
+    },
+    RepPid = case supervisor:start_child(couch_rep_sup, ChildSpec) of
+    {ok, Pid} ->
+        ?LOG_INFO("starting new replication ~p at ~p", [RepChildId, Pid]),
+        Pid;
+    {error, already_present} ->
+        case supervisor:restart_child(couch_rep_sup, RepChildId) of
+        {ok, Pid} ->
+            ?LOG_INFO("starting replication ~p at ~p", [RepChildId, Pid]),
+            Pid;
+        {error, running} ->
+            %% this error occurs if multiple replicators are racing
+            %% each other to start and somebody else won.  Just grab
+            %% the Pid by calling start_child again.
+            {error, {already_started, Pid}} =
+                supervisor:start_child(couch_rep_sup, ChildSpec),
+            ?LOG_DEBUG("replication ~p already running at ~p",
+                [RepChildId, Pid]),
+            Pid;
+        {error, {db_not_found, DbUrl}} ->
+            throw({db_not_found, <<"could not open ", DbUrl/binary>>})
+        end;
+    {error, {already_started, Pid}} ->
+        ?LOG_DEBUG("replication ~p already running at ~p", [RepChildId, Pid]),
+        Pid;
+    {error, {{db_not_found, DbUrl}, _}} ->
+        throw({db_not_found, <<"could not open ", DbUrl/binary>>})
+    end,
+    {ok, RepPid}.
+
+
+rep_result_listener(RepId) ->
+    ReplyTo = self(),
+    {ok, _Listener} = couch_replication_notifier:start_link(
+        fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
+                ReplyTo ! Ev;
+            (_) ->
+                ok
+        end).
+
+
+wait_for_result(RepId, Listener) ->
+    Result = receive
+    {finished, RepId, RepResult} ->
+        {ok, RepResult};
+    {error, RepId, Reason} ->
+        {error, Reason}
+    end,
+    couch_replication_notifier:stop(Listener),
+    Result.
+
+
+end_replication({BaseId, Extension}) ->
+    FullRepId = BaseId ++ Extension,
+    case supervisor:terminate_child(couch_rep_sup, FullRepId) of
+    {error, not_found} = R ->
+        R;
+    ok ->
+        ok = supervisor:delete_child(couch_rep_sup, FullRepId),
+        {ok, {cancelled, ?l2b(BaseId)}}
+    end.
+
+
+init(InitArgs) ->
+    try
+        do_init(InitArgs)
+    catch
+    throw:{db_not_found, DbUrl} ->
+        {stop, {db_not_found, DbUrl}}
+    end.
+
+do_init([RepId, Src, Tgt, Options, UserCtx]) ->
+    process_flag(trap_exit, true),
+
+    #rep_state{
+        source = Source,
+        target = Target,
+        start_seq = StartSeq
+    } = State = init_state(RepId, Src, Tgt, Options, UserCtx),
 
     {ok, MissingRevsQueue} = couch_work_queue:new(100000, 500),
 
@@ -59,16 +168,21 @@ start(Src, Tgt, Options, UserCtx) ->
     undefined ->
         {ok, ChangesQueue} = couch_work_queue:new(100000, 500),
 
-        % this is starts the _changes reader process. It adds the changes from
+        % This starts the _changes reader process. It adds the changes from
         % the source db to the ChangesQueue.
-        spawn_changes_reader(self(), StartSeq, Source, ChangesQueue, Options),
+        ChangesReader = spawn_changes_reader(self(), StartSeq, Source,
+            ChangesQueue, Options),
 
-        % this starts the missing revs finder, it checks the target for changes
+        % This starts the missing revs finder. It checks the target for changes
         % in the ChangesQueue to see if they exist on the target or not. If not,
         % adds them to MissingRevsQueue.
-        spawn_missing_revs_finder(self(), Target, ChangesQueue,
-            MissingRevsQueue);
+        MissingRevsFinder = spawn_missing_revs_finder(self(), Target,
+            ChangesQueue, MissingRevsQueue);
     DocIds ->
+        ChangesQueue = nil,
+        ChangesReader = nil,
+        MissingRevsFinder = nil,
+
         lists:foreach(
             fun(DocId) ->
                 ok = couch_work_queue:queue(MissingRevsQueue, {doc_id, DocId})
@@ -76,21 +190,155 @@ start(Src, Tgt, Options, UserCtx) ->
         couch_work_queue:close(MissingRevsQueue)
     end,
 
-    % This starts the doc copy process. It gets the documents from the
-    % MissingRevsQueue, copying them from the source to the target database.
-    spawn_doc_copy(self(), Source, Target, MissingRevsQueue),
-    
-    % This is the checkpoint loop, it updates the replication record in the
-    % database every X seconds, so that if the replication is interuppted,
-    % it can restart near where it left off.
-    {ok, State2, Stats} = checkpoint_loop(State, gb_trees:from_orddict([]),
-            #stats{}),
-    couch_api_wrap:db_close(Source),        
-    couch_api_wrap:db_close(Target),
-    {ok, get_result(State2, Stats, Options)}.
+    % This starts the doc copy process. It fetches documents from the
+    % MissingRevsQueue and copies them from the source to the target database.
+    DocCopier = spawn_doc_copy(self(), Source, Target, MissingRevsQueue),
+
+    {ok, State#rep_state{
+            missing_revs_queue = MissingRevsQueue,
+            changes_queue = ChangesQueue,
+            changes_reader = ChangesReader,
+            missing_revs_finder = MissingRevsFinder,
+            doc_copier = DocCopier
+        }
+    }.
+
+
+handle_info({seq_start, {Seq, NumChanges}}, State) ->
+    SeqsInProgress2 = gb_trees:insert(Seq, NumChanges,
+        State#rep_state.seqs_in_progress),
+    {noreply, State#rep_state{seqs_in_progress = SeqsInProgress2}};
+
+handle_info({seq_changes_done, {Seq, NumChangesDone}}, State) ->
+    #rep_state{seqs_in_progress = SeqsInProgress} = State,
+    % Decrement the # changes for this seq by NumChangesDone.
+    TotalChanges = gb_trees:get(Seq, State#rep_state.seqs_in_progress),
+    NewState = case TotalChanges - NumChangesDone of
+    0 ->
+        % This seq is completely processed. Check to see if it was the
+        % smallest seq in progess. If so, we've made progress that can
+        % be checkpointed.
+        State2 = case gb_trees:smallest(SeqsInProgress) of
+        {Seq, _} ->
+            State#rep_state{current_through_seq = Seq};
+        _ ->
+            State
+        end,
+        State2#rep_state{
+            seqs_in_progress = gb_trees:delete(Seq, SeqsInProgress)
+        };
+    NewTotalChanges when NewTotalChanges > 0 ->
+        % There are still some changes that need work done.
+        % Put the new count back.
+        State#rep_state{
+            seqs_in_progress =
+                gb_trees:update(Seq, NewTotalChanges, SeqsInProgress)
+        }
+    end,
+    {noreply, NewState};
+
+handle_info({add_stat, {StatPos, Val}}, #rep_state{stats = Stats} = State) ->
+    Stat = element(StatPos, Stats),
+    NewStats = setelement(StatPos, Stats, Stat + Val),
+    {noreply, State#rep_state{stats = NewStats}};
+
+handle_info(timed_checkpoint, State) ->
+    State2 = do_checkpoint(State),
+    NewTimer = erlang:start_timer(checkpoint_interval(State2),
+        self(), timed_checkpoint),
+    {noreply, State2#rep_state{timer = NewTimer}};
+
+handle_info(done, #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
+    % This means all the worker processes have completed their work.
+    % Assert that all the seqs have been processed
+    0 = gb_trees:size(SeqsInProgress),
+    NewState = do_checkpoint(State),
+    cancel_timer(NewState),
+    {stop, normal, NewState};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
+    {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+    ?LOG_ERROR("ChangesReader process died with reason: ~p", [Reason]),
+    {stop, changes_reader_died, State};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{missing_revs_finder=Pid} = St) ->
+    {noreply, St};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{missing_revs_finder=Pid} = St) ->
+    ?LOG_ERROR("MissingRevsFinder process died with reason: ~p", [Reason]),
+    {stop, missing_revs_finder_died, St};
 
+handle_info({'EXIT', Pid, normal}, #rep_state{doc_copier=Pid} = State) ->
+    {noreply, State};
 
-get_result(State, Stats, Options) ->
+handle_info({'EXIT', Pid, Reason}, #rep_state{doc_copier=Pid} = State) ->
+    ?LOG_ERROR("DocCopier process died with reason: ~p", [Reason]),
+    {stop, doc_copier_died, State};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{missing_revs_queue=Pid} = St) ->
+    {noreply, St};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{missing_revs_queue=Pid} = St) ->
+    ?LOG_ERROR("MissingRevsQueue process died with reason: ~p", [Reason]),
+    {stop, missing_revs_queue_died, St};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
+    {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
+    ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]),
+    {stop, changes_queue_died, State}.
+
+
+handle_call(Msg, _From, State) ->
+    ?LOG_ERROR("Replicator received an unexpected synchronous call: ~p", [Msg]),
+    {stop, unexpected_sync_message, State}.
+
+
+handle_cast(Msg, State) ->
+    ?LOG_ERROR("Replicator received an unexpected asynchronous call: ~p", [Msg]),
+    {stop, unexpected_async_message, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+
+terminate(normal, #rep_state{rep_id = RepId} = State) ->
+    terminate_cleanup(State),
+    couch_replication_notifier:notify({finished, RepId, get_result(State)});
+
+terminate(shutdown, State) ->
+    % cancelled replication throught ?MODULE:end_replication/1
+    terminate_cleanup(State);
+
+terminate(Reason, #rep_state{rep_id = RepId} = State) ->
+    terminate_cleanup(State),
+    couch_replication_notifier:notify({error, RepId, Reason}).
+
+
+terminate_cleanup(State) ->
+    #rep_state{
+        missing_revs_queue = MissingRevsQueue,
+        changes_queue = ChangesQueue,
+        source = Source,
+        target = Target
+    } = State,
+    couch_work_queue:close(MissingRevsQueue),
+    couch_work_queue:close(ChangesQueue),
+    couch_api_wrap:db_close(Source),
+    couch_api_wrap:db_close(Target).
+
+
+cancel_timer(#rep_state{timer = nil}) ->
+    ok;
+cancel_timer(#rep_state{timer = Timer}) ->
+    erlang:cancel_timer(Timer).
+
+
+get_result(#rep_state{stats = Stats, rep_options = Options} = State) ->
     case couch_util:get_value(doc_ids, Options) of
     undefined ->
         State#rep_state.checkpoint_history;
@@ -105,7 +353,7 @@ get_result(State, Stats, Options) ->
     end.
 
 
-init_state(Src,Tgt,Options,UserCtx)->    
+init_state({BaseId, _Ext} = RepId, Src, Tgt, Options, UserCtx) ->
     {ok, Source} = couch_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
     {ok, Target} = couch_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
         couch_util:get_value(create_target, Options, false)),
@@ -113,8 +361,7 @@ init_state(Src,Tgt,Options,UserCtx)->   
     {ok, SourceInfo} = couch_api_wrap:get_db_info(Source),
     {ok, TargetInfo} = couch_api_wrap:get_db_info(Target),
 
-    RepId = make_replication_id(Src, Tgt, UserCtx, Options),
-    DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+    DocId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
     case couch_api_wrap:open_doc(Source, DocId, []) of
     {ok, SourceLog} ->  SourceLog;
     _ ->                SourceLog = #doc{id=DocId}
@@ -126,6 +373,8 @@ init_state(Src,Tgt,Options,UserCtx)->   
     {StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
     #doc{body={CheckpointHistory}} = SourceLog,
     State = #rep_state{
+        rep_id = RepId,
+        rep_options = Options,
         source_name = Src,
         target_name = Tgt,
         source = Source,
@@ -265,72 +514,13 @@ doc_handler({ok, Doc}, Target, Cp) ->
 doc_handler(_, _, _) ->
     ok.
 
-checkpoint_loop(State, SeqsInProgress, Stats) ->
-    % SeqsInProgress contains the number of revs for each seq found by the
-    % changes process.
-    receive
-    {seq_start, {Seq, NumChanges}} ->
-        % Add this seq to the SeqsInProgress
-        SeqsInProgress2 = gb_trees:insert(Seq, NumChanges, SeqsInProgress),
-        checkpoint_loop(State, SeqsInProgress2, Stats);
-    {seq_changes_done, {Seq, NumChangesDone}} ->
-        % decrement the # changes for this seq by NumChangesDone 
-        TotalChanges = gb_trees:get(Seq, SeqsInProgress),
-        case TotalChanges - NumChangesDone of
-        0 ->
-            % this seq is completely processed. Check to see if it was the
-            % smallest seq in progess. If so, we've made progress that can
-            % be checkpointed.
-            State2 =
-            case gb_trees:smallest(SeqsInProgress) of
-            {Seq, _} ->
-                State#rep_state{current_through_seq=Seq};
-            _ ->
-                State
-            end,
-            checkpoint_loop(State2, gb_trees:delete(Seq,SeqsInProgress), Stats);
-        NewTotalChanges when NewTotalChanges > 0 ->
-            % Still some changes that need work done. Put the new count back.
-            SeqsInProgress2 =
-                gb_trees:update(Seq, NewTotalChanges, SeqsInProgress),
-            checkpoint_loop(State, SeqsInProgress2, Stats)
-        end;
-    {add_stat, {StatPos, Val}} ->
-        % Increment the stat at the pos.
-        Stat = element(StatPos, Stats),
-        Stats2 = setelement(StatPos, Stats, Stat + Val),
-        checkpoint_loop(State, SeqsInProgress, Stats2);
-    done ->
-        % This means all the worker processes have completed their work.
-        % Assert that all the seqs have been processed
-        0 = gb_trees:size(SeqsInProgress),
-        State2 = do_checkpoint(State, Stats),
-        cancel_timer(State2),
-        receive timed_checkpoint -> ok
-        after 0 -> ok
-        end,
-        {ok, State2, Stats};
-    timed_checkpoint ->
-        % every checkpoint interval while processing
-        State2 = do_checkpoint(State, Stats),
-        Timer = erlang:start_timer(checkpoint_interval(State), 
-            self(), timed_checkpoint),
-        checkpoint_loop(State2#rep_state{timer=Timer}, SeqsInProgress, Stats)
-    end.
-
-cancel_timer(#rep_state{timer = nil}) ->
-    ok;
-cancel_timer(#rep_state{timer = Timer}) ->
-    erlang:cancel_timer(Timer).
-
 
 checkpoint_interval(_State) ->
     5000.
 
-do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=OldSeq} = State,
-        _Stats) when Seq == OldSeq ->
+do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
     State;
-do_checkpoint(State, Stats) ->
+do_checkpoint(State) ->
     #rep_state{
         source_name=SourceName,
         target_name=TargetName,
@@ -343,7 +533,8 @@ do_checkpoint(State, Stats) ->
         target_log = TargetLog,
         rep_starttime = ReplicationStartTime,
         src_starttime = SrcInstanceStartTime,
-        tgt_starttime = TgtInstanceStartTime
+        tgt_starttime = TgtInstanceStartTime,
+        stats = Stats
     } = State,
     case commit_to_both(Source, Target) of
     {SrcInstanceStartTime, TgtInstanceStartTime} ->
@@ -414,8 +605,6 @@ commit_to_both(Source, Target) ->
     {SourceStartTime, TargetStartTime}.
 
 
-
-
 make_replication_id(Source, Target, UserCtx, Options) ->
     %% funky algorithm to preserve backwards compatibility
     {ok, HostName} = inet:gethostname(),
@@ -434,7 +623,21 @@ make_replication_id(Source, Target, User
         Filter ->
             [Filter, couch_util:get_value(query_params, Options, {[]})]
         end,
-    couch_util:to_hex(couch_util:md5(term_to_binary(Base))).
+    Extension = maybe_append_options([continuous, create_target], Options),
+    {couch_util:to_hex(couch_util:md5(term_to_binary(Base))), Extension}.
+
+
+maybe_append_options(Options, RepOptions) ->
+    lists:foldl(fun(Option, Acc) ->
+        Acc ++
+        case couch_util:get_value(Option, RepOptions, false) of
+        true ->
+            "+" ++ atom_to_list(Option);
+        false ->
+            ""
+        end
+    end, [], Options).
+
 
 get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
     case OAuth of

Modified: couchdb/branches/new_replicator/src/couchdb/couch_server_sup.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_server_sup.erl?rev=981973&r1=981972&r2=981973&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_server_sup.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_server_sup.erl Tue Aug  3 17:16:04 2010
@@ -157,6 +157,12 @@ start_primary_services() ->
                 permanent,
                 brutal_kill,
                 worker,
+                dynamic},
+            {couch_replication_event,
+                {gen_event, start_link, [{local, couch_replication}]},
+                permanent,
+                brutal_kill,
+                worker,
                 dynamic}
             ]
         }).