You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/08/03 19:16:05 UTC
svn commit: r981973 - in /couchdb/branches/new_replicator/src/couchdb:
Makefile.am couch_api_wrap.erl couch_httpd_rep.erl couch_replicate.erl
couch_server_sup.erl
Author: fdmanana
Date: Tue Aug 3 17:16:04 2010
New Revision: 981973
URL: http://svn.apache.org/viewvc?rev=981973&view=rev
Log:
Moved the new replicator code into an OTP gen_server (and under a supervision tree).
Fully working but needs to be more polished.
Modified:
couchdb/branches/new_replicator/src/couchdb/Makefile.am
couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
couchdb/branches/new_replicator/src/couchdb/couch_server_sup.erl
Modified: couchdb/branches/new_replicator/src/couchdb/Makefile.am
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/Makefile.am?rev=981973&r1=981972&r2=981973&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/Makefile.am (original)
+++ couchdb/branches/new_replicator/src/couchdb/Makefile.am Tue Aug 3 17:16:04 2010
@@ -80,6 +80,7 @@ source_files = \
couch_db_updater.erl \
couch_work_queue.erl \
couch_replicate.erl \
+ couch_replication_notifier.erl \
couch_httpd_rep.erl \
couch_api_wrap.erl \
json_stream_parse.erl
@@ -142,6 +143,7 @@ compiled_files = \
couch_db_updater.beam \
couch_work_queue.beam \
couch_replicate.beam \
+ couch_replication_notifier.beam \
couch_httpd_rep.beam \
couch_api_wrap.beam \
json_stream_parse.beam
Modified: couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl?rev=981973&r1=981972&r2=981973&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_api_wrap.erl Tue Aug 3 17:16:04 2010
@@ -52,15 +52,20 @@ db_open(Db, Options) ->
db_open(#httpdb{} = Db, _Options, Create) ->
#httpdb{url=Url, oauth=OAuth, headers=Headers} = Db,
+ Headers2 = oauth_header(Url, [], put, OAuth) ++ Headers,
case Create of
false ->
ok;
true ->
- Headers2 = oauth_header(Url, [], put, OAuth) ++ Headers,
- catch ibrowse:send_req(Url, Headers2, put, [],
- [{response_format, binary}], infinity)
+ catch ibrowse:send_req(Url, Headers2, put)
end,
- {ok, Db};
+ case (catch ibrowse:send_req(Url, Headers2, head)) of
+ {ok, "200", _, _} ->
+ {ok, Db};
+ {ok, _Code, _, _} ->
+ % TODO deal with HTTP redirects
+ throw({db_not_found, ?l2b(Url)})
+ end;
db_open(DbName, Options, Create) ->
case Create of
false ->
@@ -73,7 +78,12 @@ db_open(DbName, Options, Create) ->
ok
end
end,
- couch_db:open(DbName,Options).
+ case couch_db:open(DbName, Options) of
+ {not_found, _Reason} ->
+ throw({db_not_found, DbName});
+ {ok, _Db2} = Success ->
+ Success
+ end.
db_close(#httpdb{}) ->
ok;
Modified: couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl?rev=981973&r1=981972&r2=981973&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_httpd_rep.erl Tue Aug 3 17:16:04 2010
@@ -33,7 +33,11 @@ handle_req(#httpd{method='POST'} = Req)
SrcDb = parse_rep_db(couch_util:get_value(<<"source">>, PostBody)),
TgtDb = parse_rep_db(couch_util:get_value(<<"target">>, PostBody)),
Options = convert_options(PostBody),
- try couch_replicate:start(SrcDb, TgtDb, Options, Req#httpd.user_ctx) of
+ try couch_replicate:replicate(SrcDb, TgtDb, Options, Req#httpd.user_ctx) of
+ {error, Reason} ->
+ send_json(Req, 500, {[{error, Reason}]});
+ {ok, {cancelled, RepId}} ->
+ send_json(Req, 200, {[{ok, true}, {<<"_local_id">>, RepId}]});
{ok, {HistoryResults}} ->
send_json(Req, {[{ok, true} | HistoryResults]})
catch
Modified: couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl?rev=981973&r1=981972&r2=981973&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_replicate.erl Tue Aug 3 17:16:04 2010
@@ -11,14 +11,30 @@
% the License.
-module(couch_replicate).
+-behaviour(gen_server).
--export([start/4]).
+% public API
+-export([replicate/4]).
+
+% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
-include("couch_db.hrl").
-include("couch_api_wrap.hrl").
+-record(stats, {
+ missing_checked = 0,
+ missing_found = 0,
+ docs_read = 0,
+ docs_written = 0,
+ doc_write_failures = 0
+ }).
+
-record(rep_state, {
+ rep_id,
+ rep_options,
source_name,
target_name,
source,
@@ -33,25 +49,118 @@
rep_starttime,
src_starttime,
tgt_starttime,
- timer % checkpoint timer
+ timer, % checkpoint timer
+ missing_revs_queue,
+ changes_queue,
+ changes_reader,
+ missing_revs_finder,
+ doc_copier,
+ seqs_in_progress = gb_trees:from_orddict([]),
+ stats = #stats{}
}).
--record(stats, {
- missing_checked=0,
- missing_found=0,
- docs_read=0,
- docs_written=0,
- doc_write_failures=0
- }).
-
-start(Src, Tgt, Options, UserCtx) ->
-
- _Continuous = couch_util:get_value(continuous, Options, false),
-
- % initalize the replication state, looking for existing rep records
- % for incremental replication.
- #rep_state{source=Source, target=Target, start_seq=StartSeq} = State =
- init_state(Src, Tgt, Options, UserCtx),
+
+replicate(Src, Tgt, Options, UserCtx) ->
+ RepId = make_replication_id(Src, Tgt, UserCtx, Options),
+ case couch_util:get_value(cancel, Options, false) of
+ true ->
+ end_replication(RepId);
+ false ->
+ {ok, Listener} = rep_result_listener(RepId),
+ {ok, _Pid} = start_replication(RepId, Src, Tgt, Options, UserCtx),
+ wait_for_result(RepId, Listener)
+ end.
+
+
+start_replication({BaseId, Extension} = RepId, Src, Tgt, Options, UserCtx) ->
+ RepChildId = BaseId ++ Extension,
+ ChildSpec = {
+ RepChildId,
+ {gen_server, start_link,
+ [?MODULE, [RepId, Src, Tgt, Options, UserCtx], []]},
+ transient,
+ 1,
+ worker,
+ [?MODULE]
+ },
+ RepPid = case supervisor:start_child(couch_rep_sup, ChildSpec) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting new replication ~p at ~p", [RepChildId, Pid]),
+ Pid;
+ {error, already_present} ->
+ case supervisor:restart_child(couch_rep_sup, RepChildId) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting replication ~p at ~p", [RepChildId, Pid]),
+ Pid;
+ {error, running} ->
+ %% this error occurs if multiple replicators are racing
+ %% each other to start and somebody else won. Just grab
+ %% the Pid by calling start_child again.
+ {error, {already_started, Pid}} =
+ supervisor:start_child(couch_rep_sup, ChildSpec),
+ ?LOG_DEBUG("replication ~p already running at ~p",
+ [RepChildId, Pid]),
+ Pid;
+ {error, {db_not_found, DbUrl}} ->
+ throw({db_not_found, <<"could not open ", DbUrl/binary>>})
+ end;
+ {error, {already_started, Pid}} ->
+ ?LOG_DEBUG("replication ~p already running at ~p", [RepChildId, Pid]),
+ Pid;
+ {error, {{db_not_found, DbUrl}, _}} ->
+ throw({db_not_found, <<"could not open ", DbUrl/binary>>})
+ end,
+ {ok, RepPid}.
+
+
+rep_result_listener(RepId) ->
+ ReplyTo = self(),
+ {ok, _Listener} = couch_replication_notifier:start_link(
+ fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
+ ReplyTo ! Ev;
+ (_) ->
+ ok
+ end).
+
+
+wait_for_result(RepId, Listener) ->
+ Result = receive
+ {finished, RepId, RepResult} ->
+ {ok, RepResult};
+ {error, RepId, Reason} ->
+ {error, Reason}
+ end,
+ couch_replication_notifier:stop(Listener),
+ Result.
+
+
+end_replication({BaseId, Extension}) ->
+ FullRepId = BaseId ++ Extension,
+ case supervisor:terminate_child(couch_rep_sup, FullRepId) of
+ {error, not_found} = R ->
+ R;
+ ok ->
+ ok = supervisor:delete_child(couch_rep_sup, FullRepId),
+ {ok, {cancelled, ?l2b(BaseId)}}
+ end.
+
+
+init(InitArgs) ->
+ try
+ do_init(InitArgs)
+ catch
+ throw:{db_not_found, DbUrl} ->
+ {stop, {db_not_found, DbUrl}}
+ end.
+
+do_init([RepId, Src, Tgt, Options, UserCtx]) ->
+ process_flag(trap_exit, true),
+
+ #rep_state{
+ source = Source,
+ target = Target,
+ start_seq = StartSeq
+ } = State = init_state(RepId, Src, Tgt, Options, UserCtx),
{ok, MissingRevsQueue} = couch_work_queue:new(100000, 500),
@@ -59,16 +168,21 @@ start(Src, Tgt, Options, UserCtx) ->
undefined ->
{ok, ChangesQueue} = couch_work_queue:new(100000, 500),
- % this is starts the _changes reader process. It adds the changes from
+ % This starts the _changes reader process. It adds the changes from
% the source db to the ChangesQueue.
- spawn_changes_reader(self(), StartSeq, Source, ChangesQueue, Options),
+ ChangesReader = spawn_changes_reader(self(), StartSeq, Source,
+ ChangesQueue, Options),
- % this starts the missing revs finder, it checks the target for changes
+ % This starts the missing revs finder. It checks the target for changes
% in the ChangesQueue to see if they exist on the target or not. If not,
% adds them to MissingRevsQueue.
- spawn_missing_revs_finder(self(), Target, ChangesQueue,
- MissingRevsQueue);
+ MissingRevsFinder = spawn_missing_revs_finder(self(), Target,
+ ChangesQueue, MissingRevsQueue);
DocIds ->
+ ChangesQueue = nil,
+ ChangesReader = nil,
+ MissingRevsFinder = nil,
+
lists:foreach(
fun(DocId) ->
ok = couch_work_queue:queue(MissingRevsQueue, {doc_id, DocId})
@@ -76,21 +190,155 @@ start(Src, Tgt, Options, UserCtx) ->
couch_work_queue:close(MissingRevsQueue)
end,
- % This starts the doc copy process. It gets the documents from the
- % MissingRevsQueue, copying them from the source to the target database.
- spawn_doc_copy(self(), Source, Target, MissingRevsQueue),
-
- % This is the checkpoint loop, it updates the replication record in the
- % database every X seconds, so that if the replication is interuppted,
- % it can restart near where it left off.
- {ok, State2, Stats} = checkpoint_loop(State, gb_trees:from_orddict([]),
- #stats{}),
- couch_api_wrap:db_close(Source),
- couch_api_wrap:db_close(Target),
- {ok, get_result(State2, Stats, Options)}.
+ % This starts the doc copy process. It fetches documents from the
+ % MissingRevsQueue and copies them from the source to the target database.
+ DocCopier = spawn_doc_copy(self(), Source, Target, MissingRevsQueue),
+
+ {ok, State#rep_state{
+ missing_revs_queue = MissingRevsQueue,
+ changes_queue = ChangesQueue,
+ changes_reader = ChangesReader,
+ missing_revs_finder = MissingRevsFinder,
+ doc_copier = DocCopier
+ }
+ }.
+
+
+handle_info({seq_start, {Seq, NumChanges}}, State) ->
+ SeqsInProgress2 = gb_trees:insert(Seq, NumChanges,
+ State#rep_state.seqs_in_progress),
+ {noreply, State#rep_state{seqs_in_progress = SeqsInProgress2}};
+
+handle_info({seq_changes_done, {Seq, NumChangesDone}}, State) ->
+ #rep_state{seqs_in_progress = SeqsInProgress} = State,
+ % Decrement the # changes for this seq by NumChangesDone.
+ TotalChanges = gb_trees:get(Seq, State#rep_state.seqs_in_progress),
+ NewState = case TotalChanges - NumChangesDone of
+ 0 ->
+ % This seq is completely processed. Check to see if it was the
+ % smallest seq in progess. If so, we've made progress that can
+ % be checkpointed.
+ State2 = case gb_trees:smallest(SeqsInProgress) of
+ {Seq, _} ->
+ State#rep_state{current_through_seq = Seq};
+ _ ->
+ State
+ end,
+ State2#rep_state{
+ seqs_in_progress = gb_trees:delete(Seq, SeqsInProgress)
+ };
+ NewTotalChanges when NewTotalChanges > 0 ->
+ % There are still some changes that need work done.
+ % Put the new count back.
+ State#rep_state{
+ seqs_in_progress =
+ gb_trees:update(Seq, NewTotalChanges, SeqsInProgress)
+ }
+ end,
+ {noreply, NewState};
+
+handle_info({add_stat, {StatPos, Val}}, #rep_state{stats = Stats} = State) ->
+ Stat = element(StatPos, Stats),
+ NewStats = setelement(StatPos, Stats, Stat + Val),
+ {noreply, State#rep_state{stats = NewStats}};
+
+handle_info(timed_checkpoint, State) ->
+ State2 = do_checkpoint(State),
+ NewTimer = erlang:start_timer(checkpoint_interval(State2),
+ self(), timed_checkpoint),
+ {noreply, State2#rep_state{timer = NewTimer}};
+
+handle_info(done, #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
+ % This means all the worker processes have completed their work.
+ % Assert that all the seqs have been processed
+ 0 = gb_trees:size(SeqsInProgress),
+ NewState = do_checkpoint(State),
+ cancel_timer(NewState),
+ {stop, normal, NewState};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+ ?LOG_ERROR("ChangesReader process died with reason: ~p", [Reason]),
+ {stop, changes_reader_died, State};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{missing_revs_finder=Pid} = St) ->
+ {noreply, St};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{missing_revs_finder=Pid} = St) ->
+ ?LOG_ERROR("MissingRevsFinder process died with reason: ~p", [Reason]),
+ {stop, missing_revs_finder_died, St};
+handle_info({'EXIT', Pid, normal}, #rep_state{doc_copier=Pid} = State) ->
+ {noreply, State};
-get_result(State, Stats, Options) ->
+handle_info({'EXIT', Pid, Reason}, #rep_state{doc_copier=Pid} = State) ->
+ ?LOG_ERROR("DocCopier process died with reason: ~p", [Reason]),
+ {stop, doc_copier_died, State};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{missing_revs_queue=Pid} = St) ->
+ {noreply, St};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{missing_revs_queue=Pid} = St) ->
+ ?LOG_ERROR("MissingRevsQueue process died with reason: ~p", [Reason]),
+ {stop, missing_revs_queue_died, St};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
+ ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]),
+ {stop, changes_queue_died, State}.
+
+
+handle_call(Msg, _From, State) ->
+ ?LOG_ERROR("Replicator received an unexpected synchronous call: ~p", [Msg]),
+ {stop, unexpected_sync_message, State}.
+
+
+handle_cast(Msg, State) ->
+ ?LOG_ERROR("Replicator received an unexpected asynchronous call: ~p", [Msg]),
+ {stop, unexpected_async_message, State}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+terminate(normal, #rep_state{rep_id = RepId} = State) ->
+ terminate_cleanup(State),
+ couch_replication_notifier:notify({finished, RepId, get_result(State)});
+
+terminate(shutdown, State) ->
+ % cancelled replication throught ?MODULE:end_replication/1
+ terminate_cleanup(State);
+
+terminate(Reason, #rep_state{rep_id = RepId} = State) ->
+ terminate_cleanup(State),
+ couch_replication_notifier:notify({error, RepId, Reason}).
+
+
+terminate_cleanup(State) ->
+ #rep_state{
+ missing_revs_queue = MissingRevsQueue,
+ changes_queue = ChangesQueue,
+ source = Source,
+ target = Target
+ } = State,
+ couch_work_queue:close(MissingRevsQueue),
+ couch_work_queue:close(ChangesQueue),
+ couch_api_wrap:db_close(Source),
+ couch_api_wrap:db_close(Target).
+
+
+cancel_timer(#rep_state{timer = nil}) ->
+ ok;
+cancel_timer(#rep_state{timer = Timer}) ->
+ erlang:cancel_timer(Timer).
+
+
+get_result(#rep_state{stats = Stats, rep_options = Options} = State) ->
case couch_util:get_value(doc_ids, Options) of
undefined ->
State#rep_state.checkpoint_history;
@@ -105,7 +353,7 @@ get_result(State, Stats, Options) ->
end.
-init_state(Src,Tgt,Options,UserCtx)->
+init_state({BaseId, _Ext} = RepId, Src, Tgt, Options, UserCtx) ->
{ok, Source} = couch_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
{ok, Target} = couch_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
couch_util:get_value(create_target, Options, false)),
@@ -113,8 +361,7 @@ init_state(Src,Tgt,Options,UserCtx)->
{ok, SourceInfo} = couch_api_wrap:get_db_info(Source),
{ok, TargetInfo} = couch_api_wrap:get_db_info(Target),
- RepId = make_replication_id(Src, Tgt, UserCtx, Options),
- DocId = ?l2b(?LOCAL_DOC_PREFIX ++ RepId),
+ DocId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
case couch_api_wrap:open_doc(Source, DocId, []) of
{ok, SourceLog} -> SourceLog;
_ -> SourceLog = #doc{id=DocId}
@@ -126,6 +373,8 @@ init_state(Src,Tgt,Options,UserCtx)->
{StartSeq, History} = compare_replication_logs(SourceLog, TargetLog),
#doc{body={CheckpointHistory}} = SourceLog,
State = #rep_state{
+ rep_id = RepId,
+ rep_options = Options,
source_name = Src,
target_name = Tgt,
source = Source,
@@ -265,72 +514,13 @@ doc_handler({ok, Doc}, Target, Cp) ->
doc_handler(_, _, _) ->
ok.
-checkpoint_loop(State, SeqsInProgress, Stats) ->
- % SeqsInProgress contains the number of revs for each seq found by the
- % changes process.
- receive
- {seq_start, {Seq, NumChanges}} ->
- % Add this seq to the SeqsInProgress
- SeqsInProgress2 = gb_trees:insert(Seq, NumChanges, SeqsInProgress),
- checkpoint_loop(State, SeqsInProgress2, Stats);
- {seq_changes_done, {Seq, NumChangesDone}} ->
- % decrement the # changes for this seq by NumChangesDone
- TotalChanges = gb_trees:get(Seq, SeqsInProgress),
- case TotalChanges - NumChangesDone of
- 0 ->
- % this seq is completely processed. Check to see if it was the
- % smallest seq in progess. If so, we've made progress that can
- % be checkpointed.
- State2 =
- case gb_trees:smallest(SeqsInProgress) of
- {Seq, _} ->
- State#rep_state{current_through_seq=Seq};
- _ ->
- State
- end,
- checkpoint_loop(State2, gb_trees:delete(Seq,SeqsInProgress), Stats);
- NewTotalChanges when NewTotalChanges > 0 ->
- % Still some changes that need work done. Put the new count back.
- SeqsInProgress2 =
- gb_trees:update(Seq, NewTotalChanges, SeqsInProgress),
- checkpoint_loop(State, SeqsInProgress2, Stats)
- end;
- {add_stat, {StatPos, Val}} ->
- % Increment the stat at the pos.
- Stat = element(StatPos, Stats),
- Stats2 = setelement(StatPos, Stats, Stat + Val),
- checkpoint_loop(State, SeqsInProgress, Stats2);
- done ->
- % This means all the worker processes have completed their work.
- % Assert that all the seqs have been processed
- 0 = gb_trees:size(SeqsInProgress),
- State2 = do_checkpoint(State, Stats),
- cancel_timer(State2),
- receive timed_checkpoint -> ok
- after 0 -> ok
- end,
- {ok, State2, Stats};
- timed_checkpoint ->
- % every checkpoint interval while processing
- State2 = do_checkpoint(State, Stats),
- Timer = erlang:start_timer(checkpoint_interval(State),
- self(), timed_checkpoint),
- checkpoint_loop(State2#rep_state{timer=Timer}, SeqsInProgress, Stats)
- end.
-
-cancel_timer(#rep_state{timer = nil}) ->
- ok;
-cancel_timer(#rep_state{timer = Timer}) ->
- erlang:cancel_timer(Timer).
-
checkpoint_interval(_State) ->
5000.
-do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=OldSeq} = State,
- _Stats) when Seq == OldSeq ->
+do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
State;
-do_checkpoint(State, Stats) ->
+do_checkpoint(State) ->
#rep_state{
source_name=SourceName,
target_name=TargetName,
@@ -343,7 +533,8 @@ do_checkpoint(State, Stats) ->
target_log = TargetLog,
rep_starttime = ReplicationStartTime,
src_starttime = SrcInstanceStartTime,
- tgt_starttime = TgtInstanceStartTime
+ tgt_starttime = TgtInstanceStartTime,
+ stats = Stats
} = State,
case commit_to_both(Source, Target) of
{SrcInstanceStartTime, TgtInstanceStartTime} ->
@@ -414,8 +605,6 @@ commit_to_both(Source, Target) ->
{SourceStartTime, TargetStartTime}.
-
-
make_replication_id(Source, Target, UserCtx, Options) ->
%% funky algorithm to preserve backwards compatibility
{ok, HostName} = inet:gethostname(),
@@ -434,7 +623,21 @@ make_replication_id(Source, Target, User
Filter ->
[Filter, couch_util:get_value(query_params, Options, {[]})]
end,
- couch_util:to_hex(couch_util:md5(term_to_binary(Base))).
+ Extension = maybe_append_options([continuous, create_target], Options),
+ {couch_util:to_hex(couch_util:md5(term_to_binary(Base))), Extension}.
+
+
+maybe_append_options(Options, RepOptions) ->
+ lists:foldl(fun(Option, Acc) ->
+ Acc ++
+ case couch_util:get_value(Option, RepOptions, false) of
+ true ->
+ "+" ++ atom_to_list(Option);
+ false ->
+ ""
+ end
+ end, [], Options).
+
get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
case OAuth of
Modified: couchdb/branches/new_replicator/src/couchdb/couch_server_sup.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_server_sup.erl?rev=981973&r1=981972&r2=981973&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_server_sup.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_server_sup.erl Tue Aug 3 17:16:04 2010
@@ -157,6 +157,12 @@ start_primary_services() ->
permanent,
brutal_kill,
worker,
+ dynamic},
+ {couch_replication_event,
+ {gen_event, start_link, [{local, couch_replication}]},
+ permanent,
+ brutal_kill,
+ worker,
dynamic}
]
}).