You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by be...@apache.org on 2011/12/05 10:33:30 UTC
[5/8] create couch_replicator application.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator.erl b/src/couch_replicator/src/couch_replicator.erl
new file mode 100644
index 0000000..40cb9a4
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator.erl
@@ -0,0 +1,942 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator).
+-behaviour(gen_server).
+
+% public API
+-export([replicate/1]).
+
+% meant to be used only by the replicator database listener
+-export([async_replicate/1]).
+-export([cancel_replication/1]).
+
+% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include("couch_db.hrl").
+-include("couch_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+-import(couch_util, [
+ get_value/2,
+ get_value/3,
+ to_binary/1
+]).
+
+-import(couch_replicator_utils, [
+ start_db_compaction_notifier/2,
+ stop_db_compaction_notifier/1
+]).
+
+-record(rep_state, {
+ rep_details,
+ source_name,
+ target_name,
+ source,
+ target,
+ history,
+ checkpoint_history,
+ start_seq,
+ committed_seq,
+ current_through_seq,
+ seqs_in_progress = [],
+ highest_seq_done = {0, ?LOWEST_SEQ},
+ source_log,
+ target_log,
+ rep_starttime,
+ src_starttime,
+ tgt_starttime,
+ timer, % checkpoint timer
+ changes_queue,
+ changes_manager,
+ changes_reader,
+ workers,
+ stats = #rep_stats{},
+ session_id,
+ source_db_compaction_notifier = nil,
+ target_db_compaction_notifier = nil,
+ source_monitor = nil,
+ target_monitor = nil,
+ source_seq = nil
+}).
+
+
+replicate(#rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep) ->
+ case get_value(cancel, Options, false) of
+ true ->
+ case get_value(id, Options, nil) of
+ nil ->
+ cancel_replication(RepId);
+ RepId2 ->
+ cancel_replication(RepId2, UserCtx)
+ end;
+ false ->
+ {ok, Listener} = rep_result_listener(RepId),
+ Result = do_replication_loop(Rep),
+ couch_replication_notifier:stop(Listener),
+ Result
+ end.
+
+
+do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
+ case async_replicate(Rep) of
+ {ok, _Pid} ->
+ case get_value(continuous, Options, false) of
+ true ->
+ {ok, {continuous, ?l2b(BaseId ++ Ext)}};
+ false ->
+ wait_for_result(Id)
+ end;
+ Error ->
+ Error
+ end.
+
+
+async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
+ RepChildId = BaseId ++ Ext,
+ Source = couch_api_wrap:db_uri(Src),
+ Target = couch_api_wrap:db_uri(Tgt),
+ Timeout = get_value(connection_timeout, Rep#rep.options),
+ ChildSpec = {
+ RepChildId,
+ {gen_server, start_link, [?MODULE, Rep, [{timeout, Timeout}]]},
+ temporary,
+ 1,
+ worker,
+ [?MODULE]
+ },
+ % All these nested cases to attempt starting/restarting a replication child
+ % are ugly and not 100% race condition free. The following patch submission
+ % is a solution:
+ %
+ % http://erlang.2086793.n4.nabble.com/PATCH-supervisor-atomically-delete-child-spec-when-child-terminates-td3226098.html
+ %
+ case supervisor:start_child(couch_rep_sup, ChildSpec) of
+ {ok, Pid} ->
+ ?LOG_INFO("starting new replication `~s` at ~p (`~s` -> `~s`)",
+ [RepChildId, Pid, Source, Target]),
+ {ok, Pid};
+ {error, already_present} ->
+ case supervisor:restart_child(couch_rep_sup, RepChildId) of
+ {ok, Pid} ->
+ ?LOG_INFO("restarting replication `~s` at ~p (`~s` -> `~s`)",
+ [RepChildId, Pid, Source, Target]),
+ {ok, Pid};
+ {error, running} ->
+ %% this error occurs if multiple replicators are racing
+ %% each other to start and somebody else won. Just grab
+ %% the Pid by calling start_child again.
+ {error, {already_started, Pid}} =
+ supervisor:start_child(couch_rep_sup, ChildSpec),
+ ?LOG_INFO("replication `~s` already running at ~p (`~s` -> `~s`)",
+ [RepChildId, Pid, Source, Target]),
+ {ok, Pid};
+ {error, {'EXIT', {badarg,
+ [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} ->
+ % Clause to deal with a change in the supervisor module introduced
+ % in R14B02. For more details consult the thread at:
+ % http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html
+ _ = supervisor:delete_child(couch_rep_sup, RepChildId),
+ async_replicate(Rep);
+ {error, _} = Error ->
+ Error
+ end;
+ {error, {already_started, Pid}} ->
+ ?LOG_INFO("replication `~s` already running at ~p (`~s` -> `~s`)",
+ [RepChildId, Pid, Source, Target]),
+ {ok, Pid};
+ {error, {Error, _}} ->
+ {error, Error}
+ end.
+
+
+rep_result_listener(RepId) ->
+ ReplyTo = self(),
+ {ok, _Listener} = couch_replication_notifier:start_link(
+ fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
+ ReplyTo ! Ev;
+ (_) ->
+ ok
+ end).
+
+
+wait_for_result(RepId) ->
+ receive
+ {finished, RepId, RepResult} ->
+ {ok, RepResult};
+ {error, RepId, Reason} ->
+ {error, Reason}
+ end.
+
+
+cancel_replication({BaseId, Extension}) ->
+ FullRepId = BaseId ++ Extension,
+ ?LOG_INFO("Canceling replication `~s`...", [FullRepId]),
+ case supervisor:terminate_child(couch_rep_sup, FullRepId) of
+ ok ->
+ ?LOG_INFO("Replication `~s` canceled.", [FullRepId]),
+ case supervisor:delete_child(couch_rep_sup, FullRepId) of
+ ok ->
+ {ok, {cancelled, ?l2b(FullRepId)}};
+ {error, not_found} ->
+ {ok, {cancelled, ?l2b(FullRepId)}};
+ Error ->
+ Error
+ end;
+ Error ->
+ ?LOG_ERROR("Error canceling replication `~s`: ~p", [FullRepId, Error]),
+ Error
+ end.
+
+cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
+ case lists:member(<<"_admin">>, Roles) of
+ true ->
+ cancel_replication(RepId);
+ false ->
+ {BaseId, Ext} = RepId,
+ case lists:keysearch(
+ BaseId ++ Ext, 1, supervisor:which_children(couch_rep_sup)) of
+ {value, {_, Pid, _, _}} when is_pid(Pid) ->
+ case (catch gen_server:call(Pid, get_details, infinity)) of
+ {ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
+ cancel_replication(RepId);
+ {ok, _} ->
+ throw({unauthorized,
+ <<"Can't cancel a replication triggered by another user">>});
+ {'EXIT', {noproc, {gen_server, call, _}}} ->
+ {error, not_found};
+ Error ->
+ throw(Error)
+ end;
+ _ ->
+ {error, not_found}
+ end
+ end.
+
+init(InitArgs) ->
+ try
+ do_init(InitArgs)
+ catch
+ throw:{unauthorized, DbUri} ->
+ {stop, {unauthorized,
+ <<"unauthorized to access or create database ", DbUri/binary>>}};
+ throw:{db_not_found, DbUri} ->
+ {stop, {db_not_found, <<"could not open ", DbUri/binary>>}};
+ throw:Error ->
+ {stop, Error}
+ end.
+
+do_init(#rep{options = Options, id = {BaseId, Ext}} = Rep) ->
+ process_flag(trap_exit, true),
+
+ #rep_state{
+ source = Source,
+ target = Target,
+ source_name = SourceName,
+ target_name = TargetName,
+ start_seq = {_Ts, StartSeq},
+ source_seq = SourceCurSeq,
+ committed_seq = {_, CommittedSeq}
+ } = State = init_state(Rep),
+
+ NumWorkers = get_value(worker_processes, Options),
+ BatchSize = get_value(worker_batch_size, Options),
+ {ok, ChangesQueue} = couch_work_queue:new([
+ {max_items, BatchSize * NumWorkers * 2},
+ {max_size, 100 * 1024 * NumWorkers}
+ ]),
+ % This starts the _changes reader process. It adds the changes from
+ % the source db to the ChangesQueue.
+ ChangesReader = spawn_changes_reader(StartSeq, Source, ChangesQueue, Options),
+ % Changes manager - responsible for dequeing batches from the changes queue
+ % and deliver them to the worker processes.
+ ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
+ % This starts the worker processes. They ask the changes queue manager for a
+ % a batch of _changes rows to process -> check which revs are missing in the
+ % target, and for the missing ones, it copies them from the source to the target.
+ MaxConns = get_value(http_connections, Options),
+ Workers = lists:map(
+ fun(_) ->
+ {ok, Pid} = couch_replicator_worker:start_link(
+ self(), Source, Target, ChangesManager, MaxConns),
+ Pid
+ end,
+ lists:seq(1, NumWorkers)),
+
+ couch_task_status:add_task([
+ {type, replication},
+ {replication_id, ?l2b(BaseId ++ Ext)},
+ {source, ?l2b(SourceName)},
+ {target, ?l2b(TargetName)},
+ {continuous, get_value(continuous, Options, false)},
+ {revisions_checked, 0},
+ {missing_revisions_found, 0},
+ {docs_read, 0},
+ {docs_written, 0},
+ {doc_write_failures, 0},
+ {source_seq, SourceCurSeq},
+ {checkpointed_source_seq, CommittedSeq},
+ {progress, 0}
+ ]),
+ couch_task_status:set_update_frequency(1000),
+
+ % Until OTP R14B03:
+ %
+ % Restarting a temporary supervised child implies that the original arguments
+ % (#rep{} record) specified in the MFA component of the supervisor
+ % child spec will always be used whenever the child is restarted.
+ % This implies the same replication performance tunning parameters will
+ % always be used. The solution is to delete the child spec (see
+ % cancel_replication/1) and then start the replication again, but this is
+ % unfortunately not immune to race conditions.
+
+ ?LOG_INFO("Replication `~p` is using:~n"
+ "~c~p worker processes~n"
+ "~ca worker batch size of ~p~n"
+ "~c~p HTTP connections~n"
+ "~ca connection timeout of ~p milliseconds~n"
+ "~c~p retries per request~n"
+ "~csocket options are: ~s~s",
+ [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
+ MaxConns, $\t, get_value(connection_timeout, Options),
+ $\t, get_value(retries, Options),
+ $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
+ case StartSeq of
+ ?LOWEST_SEQ ->
+ "";
+ _ ->
+ io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
+ end]),
+
+ ?LOG_DEBUG("Worker pids are: ~p", [Workers]),
+
+ couch_replication_manager:replication_started(Rep),
+
+ {ok, State#rep_state{
+ changes_queue = ChangesQueue,
+ changes_manager = ChangesManager,
+ changes_reader = ChangesReader,
+ workers = Workers
+ }
+ }.
+
+
+handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
+ ?LOG_ERROR("Source database is down. Reason: ~p", [Why]),
+ {stop, source_db_down, St};
+
+handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
+ ?LOG_ERROR("Target database is down. Reason: ~p", [Why]),
+ {stop, target_db_down, St};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
+ ?LOG_ERROR("ChangesReader process died with reason: ~p", [Reason]),
+ {stop, changes_reader_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
+ ?LOG_ERROR("ChangesManager process died with reason: ~p", [Reason]),
+ {stop, changes_manager_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
+ {noreply, State};
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
+ ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]),
+ {stop, changes_queue_died, cancel_timer(State)};
+
+handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
+ case Workers -- [Pid] of
+ Workers ->
+ {stop, {unknown_process_died, Pid, normal}, State};
+ [] ->
+ catch unlink(State#rep_state.changes_manager),
+ catch exit(State#rep_state.changes_manager, kill),
+ do_last_checkpoint(State);
+ Workers2 ->
+ {noreply, State#rep_state{workers = Workers2}}
+ end;
+
+handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
+ State2 = cancel_timer(State),
+ case lists:member(Pid, Workers) of
+ false ->
+ {stop, {unknown_process_died, Pid, Reason}, State2};
+ true ->
+ ?LOG_ERROR("Worker ~p died with reason: ~p", [Pid, Reason]),
+ {stop, {worker_died, Pid, Reason}, State2}
+ end.
+
+
+handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
+ {reply, {ok, Rep}, State};
+
+handle_call({add_stats, Stats}, From, State) ->
+ gen_server:reply(From, ok),
+ NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
+ {noreply, State#rep_state{stats = NewStats}};
+
+handle_call({report_seq_done, Seq, StatsInc}, From,
+ #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
+ current_through_seq = ThroughSeq, stats = Stats} = State) ->
+ gen_server:reply(From, ok),
+ {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
+ [Seq | Rest] ->
+ {Seq, Rest};
+ [_ | _] ->
+ {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
+ end,
+ NewHighestDone = lists:max([HighestDone, Seq]),
+ NewThroughSeq = case NewSeqsInProgress of
+ [] ->
+ lists:max([NewThroughSeq0, NewHighestDone]);
+ _ ->
+ NewThroughSeq0
+ end,
+ ?LOG_DEBUG("Worker reported seq ~p, through seq was ~p, "
+ "new through seq is ~p, highest seq done was ~p, "
+ "new highest seq done is ~p~n"
+ "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
+ [Seq, ThroughSeq, NewThroughSeq, HighestDone,
+ NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
+ SourceCurSeq = source_cur_seq(State),
+ NewState = State#rep_state{
+ stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
+ current_through_seq = NewThroughSeq,
+ seqs_in_progress = NewSeqsInProgress,
+ highest_seq_done = NewHighestDone,
+ source_seq = SourceCurSeq
+ },
+ update_task(NewState),
+ {noreply, NewState}.
+
+
+handle_cast({db_compacted, DbName},
+ #rep_state{source = #db{name = DbName} = Source} = State) ->
+ {ok, NewSource} = couch_db:reopen(Source),
+ {noreply, State#rep_state{source = NewSource}};
+
+handle_cast({db_compacted, DbName},
+ #rep_state{target = #db{name = DbName} = Target} = State) ->
+ {ok, NewTarget} = couch_db:reopen(Target),
+ {noreply, State#rep_state{target = NewTarget}};
+
+handle_cast(checkpoint, State) ->
+ case do_checkpoint(State) of
+ {ok, NewState} ->
+ {noreply, NewState#rep_state{timer = start_timer(State)}};
+ Error ->
+ {stop, Error, State}
+ end;
+
+handle_cast({report_seq, Seq},
+ #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
+ NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
+ {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
+ checkpoint_history = CheckpointHistory} = State) ->
+ terminate_cleanup(State),
+ couch_replication_notifier:notify({finished, RepId, CheckpointHistory}),
+ couch_replication_manager:replication_completed(Rep);
+
+terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
+ % cancelled replication throught ?MODULE:cancel_replication/1
+ couch_replication_notifier:notify({error, RepId, <<"cancelled">>}),
+ terminate_cleanup(State);
+
+terminate(Reason, State) ->
+ #rep_state{
+ source_name = Source,
+ target_name = Target,
+ rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
+ } = State,
+ ?LOG_ERROR("Replication `~s` (`~s` -> `~s`) failed: ~s",
+ [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
+ terminate_cleanup(State),
+ couch_replication_notifier:notify({error, RepId, Reason}),
+ couch_replication_manager:replication_error(Rep, Reason).
+
+
+terminate_cleanup(State) ->
+ update_task(State),
+ stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
+ stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
+ couch_api_wrap:db_close(State#rep_state.source),
+ couch_api_wrap:db_close(State#rep_state.target).
+
+
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+ highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
+ {stop, normal, cancel_timer(State)};
+do_last_checkpoint(#rep_state{seqs_in_progress = [],
+ highest_seq_done = Seq} = State) ->
+ case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
+ {ok, NewState} ->
+ {stop, normal, cancel_timer(NewState)};
+ Error ->
+ {stop, Error, State}
+ end.
+
+
+start_timer(State) ->
+ After = checkpoint_interval(State),
+ case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
+ {ok, Ref} ->
+ Ref;
+ Error ->
+ ?LOG_ERROR("Replicator, error scheduling checkpoint: ~p", [Error]),
+ nil
+ end.
+
+
+cancel_timer(#rep_state{timer = nil} = State) ->
+ State;
+cancel_timer(#rep_state{timer = Timer} = State) ->
+ {ok, cancel} = timer:cancel(Timer),
+ State#rep_state{timer = nil}.
+
+
+init_state(Rep) ->
+ #rep{
+ source = Src, target = Tgt,
+ options = Options, user_ctx = UserCtx
+ } = Rep,
+ {ok, Source} = couch_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
+ {ok, Target} = couch_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
+ get_value(create_target, Options, false)),
+
+ {ok, SourceInfo} = couch_api_wrap:get_db_info(Source),
+ {ok, TargetInfo} = couch_api_wrap:get_db_info(Target),
+
+ [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep),
+
+ {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
+ StartSeq1 = get_value(since_seq, Options, StartSeq0),
+ StartSeq = {0, StartSeq1},
+ #doc{body={CheckpointHistory}} = SourceLog,
+ State = #rep_state{
+ rep_details = Rep,
+ source_name = couch_api_wrap:db_uri(Source),
+ target_name = couch_api_wrap:db_uri(Target),
+ source = Source,
+ target = Target,
+ history = History,
+ checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
+ start_seq = StartSeq,
+ current_through_seq = StartSeq,
+ committed_seq = StartSeq,
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = httpd_util:rfc1123_date(),
+ src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
+ tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
+ session_id = couch_uuids:random(),
+ source_db_compaction_notifier =
+ start_db_compaction_notifier(Source, self()),
+ target_db_compaction_notifier =
+ start_db_compaction_notifier(Target, self()),
+ source_monitor = db_monitor(Source),
+ target_monitor = db_monitor(Target),
+ source_seq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ)
+ },
+ State#rep_state{timer = start_timer(State)}.
+
+
+find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
+ LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
+ fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
+
+
+fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
+ lists:reverse(Acc);
+
+fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
+ case couch_api_wrap:open_doc(Db, LogId, [ejson_body]) of
+ {error, <<"not_found">>} when Vsn > 1 ->
+ OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
+ fold_replication_logs(Dbs, Vsn - 1,
+ ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
+ {error, <<"not_found">>} ->
+ fold_replication_logs(
+ Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]);
+ {ok, Doc} when LogId =:= NewId ->
+ fold_replication_logs(
+ Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
+ {ok, Doc} ->
+ MigratedLog = #doc{id = NewId, body = Doc#doc.body},
+ fold_replication_logs(
+ Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
+ end.
+
+
+spawn_changes_reader(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
+ spawn_link(fun() ->
+ put(last_seq, StartSeq),
+ put(retries_left, Db#httpdb.retries),
+ read_changes(StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options)
+ end);
+spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) ->
+ spawn_link(fun() ->
+ read_changes(StartSeq, Db, ChangesQueue, Options)
+ end).
+
+read_changes(StartSeq, Db, ChangesQueue, Options) ->
+ try
+ couch_api_wrap:changes_since(Db, all_docs, StartSeq,
+ fun(#doc_info{high_seq = Seq, id = Id} = DocInfo) ->
+ case Id of
+ <<>> ->
+ % Previous CouchDB releases had a bug which allowed a doc
+ % with an empty ID to be inserted into databases. Such doc
+ % is impossible to GET.
+ ?LOG_ERROR("Replicator: ignoring document with empty ID in "
+ "source database `~s` (_changes sequence ~p)",
+ [couch_api_wrap:db_uri(Db), Seq]);
+ _ ->
+ ok = couch_work_queue:queue(ChangesQueue, DocInfo)
+ end,
+ put(last_seq, Seq)
+ end, Options),
+ couch_work_queue:close(ChangesQueue)
+ catch exit:{http_request_failed, _, _, _} = Error ->
+ case get(retries_left) of
+ N when N > 0 ->
+ put(retries_left, N - 1),
+ LastSeq = get(last_seq),
+ Db2 = case LastSeq of
+ StartSeq ->
+ ?LOG_INFO("Retrying _changes request to source database ~s"
+ " with since=~p in ~p seconds",
+ [couch_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
+ ok = timer:sleep(Db#httpdb.wait),
+ Db#httpdb{wait = 2 * Db#httpdb.wait};
+ _ ->
+ ?LOG_INFO("Retrying _changes request to source database ~s"
+ " with since=~p", [couch_api_wrap:db_uri(Db), LastSeq]),
+ Db
+ end,
+ read_changes(LastSeq, Db2, ChangesQueue, Options);
+ _ ->
+ exit(Error)
+ end
+ end.
+
+
+spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
+ spawn_link(fun() ->
+ changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
+ end).
+
+changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
+ receive
+ {get_changes, From} ->
+ case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
+ closed ->
+ From ! {closed, self()};
+ {ok, Changes} ->
+ #doc_info{high_seq = Seq} = lists:last(Changes),
+ ReportSeq = {Ts, Seq},
+ ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
+ From ! {changes, self(), Changes, ReportSeq}
+ end,
+ changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
+ end.
+
+
+checkpoint_interval(_State) ->
+ 5000.
+
+do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
+ SourceCurSeq = source_cur_seq(State),
+ NewState = State#rep_state{source_seq = SourceCurSeq},
+ update_task(NewState),
+ {ok, NewState};
+do_checkpoint(State) ->
+ #rep_state{
+ source_name=SourceName,
+ target_name=TargetName,
+ source = Source,
+ target = Target,
+ history = OldHistory,
+ start_seq = {_, StartSeq},
+ current_through_seq = {_Ts, NewSeq} = NewTsSeq,
+ source_log = SourceLog,
+ target_log = TargetLog,
+ rep_starttime = ReplicationStartTime,
+ src_starttime = SrcInstanceStartTime,
+ tgt_starttime = TgtInstanceStartTime,
+ stats = Stats,
+ rep_details = #rep{options = Options},
+ session_id = SessionId
+ } = State,
+ case commit_to_both(Source, Target) of
+ {source_error, Reason} ->
+ {checkpoint_commit_failure,
+ <<"Failure on source commit: ", (to_binary(Reason))/binary>>};
+ {target_error, Reason} ->
+ {checkpoint_commit_failure,
+ <<"Failure on target commit: ", (to_binary(Reason))/binary>>};
+ {SrcInstanceStartTime, TgtInstanceStartTime} ->
+ ?LOG_INFO("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
+ [SourceName, TargetName, NewSeq]),
+ StartTime = ?l2b(ReplicationStartTime),
+ EndTime = ?l2b(httpd_util:rfc1123_date()),
+ NewHistoryEntry = {[
+ {<<"session_id">>, SessionId},
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
+ {<<"start_last_seq">>, StartSeq},
+ {<<"end_last_seq">>, NewSeq},
+ {<<"recorded_seq">>, NewSeq},
+ {<<"missing_checked">>, Stats#rep_stats.missing_checked},
+ {<<"missing_found">>, Stats#rep_stats.missing_found},
+ {<<"docs_read">>, Stats#rep_stats.docs_read},
+ {<<"docs_written">>, Stats#rep_stats.docs_written},
+ {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
+ ]},
+ BaseHistory = [
+ {<<"session_id">>, SessionId},
+ {<<"source_last_seq">>, NewSeq},
+ {<<"replication_id_version">>, ?REP_ID_VERSION}
+ ] ++ case get_value(doc_ids, Options) of
+ undefined ->
+ [];
+ _DocIds ->
+ % backwards compatibility with the result of a replication by
+ % doc IDs in versions 0.11.x and 1.0.x
+ % TODO: deprecate (use same history format, simplify code)
+ [
+ {<<"start_time">>, StartTime},
+ {<<"end_time">>, EndTime},
+ {<<"docs_read">>, Stats#rep_stats.docs_read},
+ {<<"docs_written">>, Stats#rep_stats.docs_written},
+ {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
+ ]
+ end,
+ % limit history to 50 entries
+ NewRepHistory = {
+ BaseHistory ++
+ [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
+ },
+
+ try
+ {SrcRevPos, SrcRevId} = update_checkpoint(
+ Source, SourceLog#doc{body = NewRepHistory}, source),
+ {TgtRevPos, TgtRevId} = update_checkpoint(
+ Target, TargetLog#doc{body = NewRepHistory}, target),
+ SourceCurSeq = source_cur_seq(State),
+ NewState = State#rep_state{
+ source_seq = SourceCurSeq,
+ checkpoint_history = NewRepHistory,
+ committed_seq = NewTsSeq,
+ source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
+ target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
+ },
+ update_task(NewState),
+ {ok, NewState}
+ catch throw:{checkpoint_commit_failure, _} = Failure ->
+ Failure
+ end;
+ {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+ {checkpoint_commit_failure, <<"Target database out of sync. "
+ "Try to increase max_dbs_open at the target's server.">>};
+ {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
+ {checkpoint_commit_failure, <<"Source database out of sync. "
+ "Try to increase max_dbs_open at the source's server.">>};
+ {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
+ {checkpoint_commit_failure, <<"Source and target databases out of "
+ "sync. Try to increase max_dbs_open at both servers.">>}
+ end.
+
+
+update_checkpoint(Db, Doc, DbType) ->
+ try
+ update_checkpoint(Db, Doc)
+ catch throw:{checkpoint_commit_failure, Reason} ->
+ throw({checkpoint_commit_failure,
+ <<"Error updating the ", (to_binary(DbType))/binary,
+ " checkpoint document: ", (to_binary(Reason))/binary>>})
+ end.
+
+update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
+ try
+ case couch_api_wrap:update_doc(Db, Doc, [delay_commit]) of
+ {ok, PosRevId} ->
+ PosRevId;
+ {error, Reason} ->
+ throw({checkpoint_commit_failure, Reason})
+ end
+ catch throw:conflict ->
+ case (catch couch_api_wrap:open_doc(Db, LogId, [ejson_body])) of
+ {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
+ % This means that we were able to update successfully the
+ % checkpoint doc in a previous attempt but we got a connection
+ % error (timeout for e.g.) before receiving the success response.
+ % Therefore the request was retried and we got a conflict, as the
+ % revision we sent is not the current one.
+ % We confirm this by verifying the doc body we just got is the same
+ % that we have just sent.
+ {Pos, RevId};
+ _ ->
+ throw({checkpoint_commit_failure, conflict})
+ end
+ end.
+
+
+commit_to_both(Source, Target) ->
+ % commit the src async
+ ParentPid = self(),
+ SrcCommitPid = spawn_link(
+ fun() ->
+ Result = (catch couch_api_wrap:ensure_full_commit(Source)),
+ ParentPid ! {self(), Result}
+ end),
+
+ % commit tgt sync
+ TargetResult = (catch couch_api_wrap:ensure_full_commit(Target)),
+
+ SourceResult = receive
+ {SrcCommitPid, Result} ->
+ unlink(SrcCommitPid),
+ receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end,
+ Result;
+ {'EXIT', SrcCommitPid, Reason} ->
+ {error, Reason}
+ end,
+ case TargetResult of
+ {ok, TargetStartTime} ->
+ case SourceResult of
+ {ok, SourceStartTime} ->
+ {SourceStartTime, TargetStartTime};
+ SourceError ->
+ {source_error, SourceError}
+ end;
+ TargetError ->
+ {target_error, TargetError}
+ end.
+
+
+compare_replication_logs(SrcDoc, TgtDoc) ->
+ #doc{body={RepRecProps}} = SrcDoc,
+ #doc{body={RepRecPropsTgt}} = TgtDoc,
+ case get_value(<<"session_id">>, RepRecProps) ==
+ get_value(<<"session_id">>, RepRecPropsTgt) of
+ true ->
+ % if the records have the same session id,
+ % then we have a valid replication history
+ OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ),
+ OldHistory = get_value(<<"history">>, RepRecProps, []),
+ {OldSeqNum, OldHistory};
+ false ->
+ SourceHistory = get_value(<<"history">>, RepRecProps, []),
+ TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
+ ?LOG_INFO("Replication records differ. "
+ "Scanning histories to find a common ancestor.", []),
+ ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
+ [RepRecProps, RepRecPropsTgt]),
+ compare_rep_history(SourceHistory, TargetHistory)
+ end.
+
+compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
+ ?LOG_INFO("no common ancestry -- performing full replication", []),
+ {?LOWEST_SEQ, []};
+compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
+ SourceId = get_value(<<"session_id">>, S),
+ case has_session_id(SourceId, Target) of
+ true ->
+ RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
+ ?LOG_INFO("found a common replication record with source_seq ~p",
+ [RecordSeqNum]),
+ {RecordSeqNum, SourceRest};
+ false ->
+ TargetId = get_value(<<"session_id">>, T),
+ case has_session_id(TargetId, SourceRest) of
+ true ->
+ RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
+ ?LOG_INFO("found a common replication record with source_seq ~p",
+ [RecordSeqNum]),
+ {RecordSeqNum, TargetRest};
+ false ->
+ compare_rep_history(SourceRest, TargetRest)
+ end
+ end.
+
+
+has_session_id(_SessionId, []) ->
+ false;
+has_session_id(SessionId, [{Props} | Rest]) ->
+ case get_value(<<"session_id">>, Props, nil) of
+ SessionId ->
+ true;
+ _Else ->
+ has_session_id(SessionId, Rest)
+ end.
+
+
+db_monitor(#db{} = Db) ->
+ couch_db:monitor(Db);
+db_monitor(_HttpDb) ->
+ nil.
+
+
+source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq}) ->
+ case (catch couch_api_wrap:get_db_info(Db#httpdb{retries = 3})) of
+ {ok, Info} ->
+ get_value(<<"update_seq">>, Info, Seq);
+ _ ->
+ Seq
+ end;
+source_cur_seq(#rep_state{source = Db, source_seq = Seq}) ->
+ {ok, Info} = couch_api_wrap:get_db_info(Db),
+ get_value(<<"update_seq">>, Info, Seq).
+
+
+update_task(State) ->
+ #rep_state{
+ current_through_seq = {_, CurSeq},
+ committed_seq = {_, CommittedSeq},
+ source_seq = SourceCurSeq,
+ stats = Stats
+ } = State,
+ couch_task_status:update([
+ {revisions_checked, Stats#rep_stats.missing_checked},
+ {missing_revisions_found, Stats#rep_stats.missing_found},
+ {docs_read, Stats#rep_stats.docs_read},
+ {docs_written, Stats#rep_stats.docs_written},
+ {doc_write_failures, Stats#rep_stats.doc_write_failures},
+ {source_seq, SourceCurSeq},
+ {checkpointed_source_seq, CommittedSeq},
+ case is_number(CurSeq) andalso is_number(SourceCurSeq) of
+ true ->
+ case SourceCurSeq of
+ 0 ->
+ {progress, 0};
+ _ ->
+ {progress, (CurSeq * 100) div SourceCurSeq}
+ end;
+ false ->
+ {progress, null}
+ end
+ ]).
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/src/couch_replicator.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator.hrl b/src/couch_replicator/src/couch_replicator.hrl
new file mode 100644
index 0000000..20c0bc3
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator.hrl
@@ -0,0 +1,30 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-define(REP_ID_VERSION, 2).
+
+-record(rep, {
+ id,
+ source,
+ target,
+ options,
+ user_ctx,
+ doc_id
+}).
+
+-record(rep_stats, {
+ missing_checked = 0,
+ missing_found = 0,
+ docs_read = 0,
+ docs_written = 0,
+ doc_write_failures = 0
+}).
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/src/couch_replicator_js_functions.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_js_functions.hrl b/src/couch_replicator/src/couch_replicator_js_functions.hrl
new file mode 100644
index 0000000..3f1db7c
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_js_functions.hrl
@@ -0,0 +1,151 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-define(REP_DB_DOC_VALIDATE_FUN, <<"
+ function(newDoc, oldDoc, userCtx) {
+ function reportError(error_msg) {
+ log('Error writing document `' + newDoc._id +
+ '\\' to the replicator database: ' + error_msg);
+ throw({forbidden: error_msg});
+ }
+
+ function validateEndpoint(endpoint, fieldName) {
+ if ((typeof endpoint !== 'string') &&
+ ((typeof endpoint !== 'object') || (endpoint === null))) {
+
+ reportError('The `' + fieldName + '\\' property must exist' +
+ ' and be either a string or an object.');
+ }
+
+ if (typeof endpoint === 'object') {
+ if ((typeof endpoint.url !== 'string') || !endpoint.url) {
+ reportError('The url property must exist in the `' +
+ fieldName + '\\' field and must be a non-empty string.');
+ }
+
+ if ((typeof endpoint.auth !== 'undefined') &&
+ ((typeof endpoint.auth !== 'object') ||
+ endpoint.auth === null)) {
+
+ reportError('`' + fieldName +
+ '.auth\\' must be a non-null object.');
+ }
+
+ if ((typeof endpoint.headers !== 'undefined') &&
+ ((typeof endpoint.headers !== 'object') ||
+ endpoint.headers === null)) {
+
+ reportError('`' + fieldName +
+ '.headers\\' must be a non-null object.');
+ }
+ }
+ }
+
+ var isReplicator = (userCtx.roles.indexOf('_replicator') >= 0);
+ var isAdmin = (userCtx.roles.indexOf('_admin') >= 0);
+
+ if (oldDoc && !newDoc._deleted && !isReplicator &&
+ (oldDoc._replication_state === 'triggered')) {
+ reportError('Only the replicator can edit replication documents ' +
+ 'that are in the triggered state.');
+ }
+
+ if (!newDoc._deleted) {
+ validateEndpoint(newDoc.source, 'source');
+ validateEndpoint(newDoc.target, 'target');
+
+ if ((typeof newDoc.create_target !== 'undefined') &&
+ (typeof newDoc.create_target !== 'boolean')) {
+
+ reportError('The `create_target\\' field must be a boolean.');
+ }
+
+ if ((typeof newDoc.continuous !== 'undefined') &&
+ (typeof newDoc.continuous !== 'boolean')) {
+
+ reportError('The `continuous\\' field must be a boolean.');
+ }
+
+ if ((typeof newDoc.doc_ids !== 'undefined') &&
+ !isArray(newDoc.doc_ids)) {
+
+ reportError('The `doc_ids\\' field must be an array of strings.');
+ }
+
+ if ((typeof newDoc.filter !== 'undefined') &&
+ ((typeof newDoc.filter !== 'string') || !newDoc.filter)) {
+
+ reportError('The `filter\\' field must be a non-empty string.');
+ }
+
+ if ((typeof newDoc.query_params !== 'undefined') &&
+ ((typeof newDoc.query_params !== 'object') ||
+ newDoc.query_params === null)) {
+
+ reportError('The `query_params\\' field must be an object.');
+ }
+
+ if (newDoc.user_ctx) {
+ var user_ctx = newDoc.user_ctx;
+
+ if ((typeof user_ctx !== 'object') || (user_ctx === null)) {
+ reportError('The `user_ctx\\' property must be a ' +
+ 'non-null object.');
+ }
+
+ if (!(user_ctx.name === null ||
+ (typeof user_ctx.name === 'undefined') ||
+ ((typeof user_ctx.name === 'string') &&
+ user_ctx.name.length > 0))) {
+
+ reportError('The `user_ctx.name\\' property must be a ' +
+ 'non-empty string or null.');
+ }
+
+ if (!isAdmin && (user_ctx.name !== userCtx.name)) {
+ reportError('The given `user_ctx.name\\' is not valid');
+ }
+
+ if (user_ctx.roles && !isArray(user_ctx.roles)) {
+ reportError('The `user_ctx.roles\\' property must be ' +
+ 'an array of strings.');
+ }
+
+ if (!isAdmin && user_ctx.roles) {
+ for (var i = 0; i < user_ctx.roles.length; i++) {
+ var role = user_ctx.roles[i];
+
+ if (typeof role !== 'string' || role.length === 0) {
+ reportError('Roles must be non-empty strings.');
+ }
+ if (userCtx.roles.indexOf(role) === -1) {
+ reportError('Invalid role (`' + role +
+ '\\') in the `user_ctx\\'');
+ }
+ }
+ }
+ } else {
+ if (!isAdmin) {
+ reportError('The `user_ctx\\' property is missing (it is ' +
+ 'optional for admins only).');
+ }
+ }
+ } else {
+ if (!isAdmin) {
+ if (!oldDoc.user_ctx || (oldDoc.user_ctx.name !== userCtx.name)) {
+ reportError('Replication documents can only be deleted by ' +
+ 'admins or by the users who created them.');
+ }
+ }
+ }
+ }
+">>).
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_utils.erl b/src/couch_replicator/src/couch_replicator_utils.erl
new file mode 100644
index 0000000..6cc4db8
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_utils.erl
@@ -0,0 +1,382 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_utils).
+
+-export([parse_rep_doc/2]).
+-export([open_db/1, close_db/1]).
+-export([start_db_compaction_notifier/2, stop_db_compaction_notifier/1]).
+-export([replication_id/2]).
+-export([sum_stats/2]).
+
+-include("couch_db.hrl").
+-include("couch_api_wrap.hrl").
+-include("couch_replicator.hrl").
+-include("../ibrowse/ibrowse.hrl").
+
+-import(couch_util, [
+ get_value/2,
+ get_value/3
+]).
+
+
+parse_rep_doc({Props}, UserCtx) ->
+ ProxyParams = parse_proxy_params(get_value(<<"proxy">>, Props, <<>>)),
+ Options = make_options(Props),
+ case get_value(cancel, Options, false) andalso
+ (get_value(id, Options, nil) =/= nil) of
+ true ->
+ {ok, #rep{options = Options, user_ctx = UserCtx}};
+ false ->
+ Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams, Options),
+ Target = parse_rep_db(get_value(<<"target">>, Props), ProxyParams, Options),
+ Rep = #rep{
+ source = Source,
+ target = Target,
+ options = Options,
+ user_ctx = UserCtx,
+ doc_id = get_value(<<"_id">>, Props)
+ },
+ {ok, Rep#rep{id = replication_id(Rep)}}
+ end.
+
+
+replication_id(#rep{options = Options} = Rep) ->
+ BaseId = replication_id(Rep, ?REP_ID_VERSION),
+ {BaseId, maybe_append_options([continuous, create_target], Options)}.
+
+
+% Versioned clauses for generating replication IDs.
+% If a change is made to how replications are identified,
+% please add a new clause and increase ?REP_ID_VERSION.
+
+replication_id(#rep{user_ctx = UserCtx} = Rep, 2) ->
+ {ok, HostName} = inet:gethostname(),
+ Port = case (catch mochiweb_socket_server:get(couch_httpd, port)) of
+ P when is_number(P) ->
+ P;
+ _ ->
+ % On restart we might be called before the couch_httpd process is
+ % started.
+ % TODO: we might be under an SSL socket server only, or both under
+ % SSL and a non-SSL socket.
+ % ... mochiweb_socket_server:get(https, port)
+ list_to_integer(couch_config:get("httpd", "port", "5984"))
+ end,
+ Src = get_rep_endpoint(UserCtx, Rep#rep.source),
+ Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+ maybe_append_filters([HostName, Port, Src, Tgt], Rep);
+
+replication_id(#rep{user_ctx = UserCtx} = Rep, 1) ->
+ {ok, HostName} = inet:gethostname(),
+ Src = get_rep_endpoint(UserCtx, Rep#rep.source),
+ Tgt = get_rep_endpoint(UserCtx, Rep#rep.target),
+ maybe_append_filters([HostName, Src, Tgt], Rep).
+
+
+maybe_append_filters(Base,
+ #rep{source = Source, user_ctx = UserCtx, options = Options}) ->
+ Base2 = Base ++
+ case get_value(filter, Options) of
+ undefined ->
+ case get_value(doc_ids, Options) of
+ undefined ->
+ [];
+ DocIds ->
+ [DocIds]
+ end;
+ Filter ->
+ [filter_code(Filter, Source, UserCtx),
+ get_value(query_params, Options, {[]})]
+ end,
+ couch_util:to_hex(couch_util:md5(term_to_binary(Base2))).
+
+
+filter_code(Filter, Source, UserCtx) ->
+ {DDocName, FilterName} =
+ case re:run(Filter, "(.*?)/(.*)", [{capture, [1, 2], binary}]) of
+ {match, [DDocName0, FilterName0]} ->
+ {DDocName0, FilterName0};
+ _ ->
+ throw({error, <<"Invalid filter. Must match `ddocname/filtername`.">>})
+ end,
+ Db = case (catch couch_api_wrap:db_open(Source, [{user_ctx, UserCtx}])) of
+ {ok, Db0} ->
+ Db0;
+ DbError ->
+ DbErrorMsg = io_lib:format("Could not open source database `~s`: ~s",
+ [couch_api_wrap:db_uri(Source), couch_util:to_binary(DbError)]),
+ throw({error, iolist_to_binary(DbErrorMsg)})
+ end,
+ try
+ Body = case (catch couch_api_wrap:open_doc(
+ Db, <<"_design/", DDocName/binary>>, [ejson_body])) of
+ {ok, #doc{body = Body0}} ->
+ Body0;
+ DocError ->
+ DocErrorMsg = io_lib:format(
+ "Couldn't open document `_design/~s` from source "
+ "database `~s`: ~s", [DDocName, couch_api_wrap:db_uri(Source),
+ couch_util:to_binary(DocError)]),
+ throw({error, iolist_to_binary(DocErrorMsg)})
+ end,
+ Code = couch_util:get_nested_json_value(
+ Body, [<<"filters">>, FilterName]),
+ re:replace(Code, [$^, "\s*(.*?)\s*", $$], "\\1", [{return, binary}])
+ after
+ couch_api_wrap:db_close(Db)
+ end.
+
+
+maybe_append_options(Options, RepOptions) ->
+ lists:foldl(fun(Option, Acc) ->
+ Acc ++
+ case get_value(Option, RepOptions, false) of
+ true ->
+ "+" ++ atom_to_list(Option);
+ false ->
+ ""
+ end
+ end, [], Options).
+
+
+get_rep_endpoint(_UserCtx, #httpdb{url=Url, headers=Headers, oauth=OAuth}) ->
+ DefaultHeaders = (#httpdb{})#httpdb.headers,
+ case OAuth of
+ nil ->
+ {remote, Url, Headers -- DefaultHeaders};
+ #oauth{} ->
+ {remote, Url, Headers -- DefaultHeaders, OAuth}
+ end;
+get_rep_endpoint(UserCtx, <<DbName/binary>>) ->
+ {local, DbName, UserCtx}.
+
+
+parse_rep_db({Props}, ProxyParams, Options) ->
+ Url = maybe_add_trailing_slash(get_value(<<"url">>, Props)),
+ {AuthProps} = get_value(<<"auth">>, Props, {[]}),
+ {BinHeaders} = get_value(<<"headers">>, Props, {[]}),
+ Headers = lists:ukeysort(1, [{?b2l(K), ?b2l(V)} || {K, V} <- BinHeaders]),
+ DefaultHeaders = (#httpdb{})#httpdb.headers,
+ OAuth = case get_value(<<"oauth">>, AuthProps) of
+ undefined ->
+ nil;
+ {OauthProps} ->
+ #oauth{
+ consumer_key = ?b2l(get_value(<<"consumer_key">>, OauthProps)),
+ token = ?b2l(get_value(<<"token">>, OauthProps)),
+ token_secret = ?b2l(get_value(<<"token_secret">>, OauthProps)),
+ consumer_secret = ?b2l(get_value(<<"consumer_secret">>, OauthProps)),
+ signature_method =
+ case get_value(<<"signature_method">>, OauthProps) of
+ undefined -> hmac_sha1;
+ <<"PLAINTEXT">> -> plaintext;
+ <<"HMAC-SHA1">> -> hmac_sha1;
+ <<"RSA-SHA1">> -> rsa_sha1
+ end
+ }
+ end,
+ #httpdb{
+ url = Url,
+ oauth = OAuth,
+ headers = lists:ukeymerge(1, Headers, DefaultHeaders),
+ ibrowse_options = lists:keysort(1,
+ [{socket_options, get_value(socket_options, Options)} |
+ ProxyParams ++ ssl_params(Url)]),
+ timeout = get_value(connection_timeout, Options),
+ http_connections = get_value(http_connections, Options),
+ retries = get_value(retries, Options)
+ };
+parse_rep_db(<<"http://", _/binary>> = Url, ProxyParams, Options) ->
+ parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
+parse_rep_db(<<"https://", _/binary>> = Url, ProxyParams, Options) ->
+ parse_rep_db({[{<<"url">>, Url}]}, ProxyParams, Options);
+parse_rep_db(<<DbName/binary>>, _ProxyParams, _Options) ->
+ DbName.
+
+
+maybe_add_trailing_slash(Url) when is_binary(Url) ->
+ maybe_add_trailing_slash(?b2l(Url));
+maybe_add_trailing_slash(Url) ->
+ case lists:last(Url) of
+ $/ ->
+ Url;
+ _ ->
+ Url ++ "/"
+ end.
+
+
+make_options(Props) ->
+ Options = lists:ukeysort(1, convert_options(Props)),
+ DefWorkers = couch_config:get("replicator", "worker_processes", "4"),
+ DefBatchSize = couch_config:get("replicator", "worker_batch_size", "500"),
+ DefConns = couch_config:get("replicator", "http_connections", "20"),
+ DefTimeout = couch_config:get("replicator", "connection_timeout", "30000"),
+ DefRetries = couch_config:get("replicator", "retries_per_request", "10"),
+ {ok, DefSocketOptions} = couch_util:parse_term(
+ couch_config:get("replicator", "socket_options",
+ "[{keepalive, true}, {nodelay, false}]")),
+ lists:ukeymerge(1, Options, lists:keysort(1, [
+ {connection_timeout, list_to_integer(DefTimeout)},
+ {retries, list_to_integer(DefRetries)},
+ {http_connections, list_to_integer(DefConns)},
+ {socket_options, DefSocketOptions},
+ {worker_batch_size, list_to_integer(DefBatchSize)},
+ {worker_processes, list_to_integer(DefWorkers)}
+ ])).
+
+
+convert_options([])->
+ [];
+convert_options([{<<"cancel">>, V} | R]) ->
+ [{cancel, V} | convert_options(R)];
+convert_options([{IdOpt, V} | R]) when IdOpt =:= <<"_local_id">>;
+ IdOpt =:= <<"replication_id">>; IdOpt =:= <<"id">> ->
+ Id = lists:splitwith(fun(X) -> X =/= $+ end, ?b2l(V)),
+ [{id, Id} | convert_options(R)];
+convert_options([{<<"create_target">>, V} | R]) ->
+ [{create_target, V} | convert_options(R)];
+convert_options([{<<"continuous">>, V} | R]) ->
+ [{continuous, V} | convert_options(R)];
+convert_options([{<<"filter">>, V} | R]) ->
+ [{filter, V} | convert_options(R)];
+convert_options([{<<"query_params">>, V} | R]) ->
+ [{query_params, V} | convert_options(R)];
+convert_options([{<<"doc_ids">>, null} | R]) ->
+ convert_options(R);
+convert_options([{<<"doc_ids">>, V} | R]) ->
+ % Ensure same behaviour as old replicator: accept a list of percent
+ % encoded doc IDs.
+ DocIds = [?l2b(couch_httpd:unquote(Id)) || Id <- V],
+ [{doc_ids, DocIds} | convert_options(R)];
+convert_options([{<<"worker_processes">>, V} | R]) ->
+ [{worker_processes, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"worker_batch_size">>, V} | R]) ->
+ [{worker_batch_size, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"http_connections">>, V} | R]) ->
+ [{http_connections, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"connection_timeout">>, V} | R]) ->
+ [{connection_timeout, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"retries_per_request">>, V} | R]) ->
+ [{retries, couch_util:to_integer(V)} | convert_options(R)];
+convert_options([{<<"socket_options">>, V} | R]) ->
+ {ok, SocketOptions} = couch_util:parse_term(V),
+ [{socket_options, SocketOptions} | convert_options(R)];
+convert_options([{<<"since_seq">>, V} | R]) ->
+ [{since_seq, V} | convert_options(R)];
+convert_options([_ | R]) -> % skip unknown option
+ convert_options(R).
+
+
+parse_proxy_params(ProxyUrl) when is_binary(ProxyUrl) ->
+ parse_proxy_params(?b2l(ProxyUrl));
+parse_proxy_params([]) ->
+ [];
+parse_proxy_params(ProxyUrl) ->
+ #url{
+ host = Host,
+ port = Port,
+ username = User,
+ password = Passwd
+ } = ibrowse_lib:parse_url(ProxyUrl),
+ [{proxy_host, Host}, {proxy_port, Port}] ++
+ case is_list(User) andalso is_list(Passwd) of
+ false ->
+ [];
+ true ->
+ [{proxy_user, User}, {proxy_password, Passwd}]
+ end.
+
+
+ssl_params(Url) ->
+ case ibrowse_lib:parse_url(Url) of
+ #url{protocol = https} ->
+ Depth = list_to_integer(
+ couch_config:get("replicator", "ssl_certificate_max_depth", "3")
+ ),
+ VerifyCerts = couch_config:get("replicator", "verify_ssl_certificates"),
+ CertFile = couch_config:get("replicator", "cert_file", nil),
+ KeyFile = couch_config:get("replicator", "key_file", nil),
+ Password = couch_config:get("replicator", "password", nil),
+ SslOpts = [{depth, Depth} | ssl_verify_options(VerifyCerts =:= "true")],
+ SslOpts1 = case CertFile /= nil andalso KeyFile /= nil of
+ true ->
+ case Password of
+ nil ->
+ [{certfile, CertFile}, {keyfile, KeyFile}] ++ SslOpts;
+ _ ->
+ [{certfile, CertFile}, {keyfile, KeyFile},
+ {password, Password}] ++ SslOpts
+ end;
+ false -> SslOpts
+ end,
+ [{is_ssl, true}, {ssl_options, SslOpts1}];
+ #url{protocol = http} ->
+ []
+ end.
+
+ssl_verify_options(Value) ->
+ ssl_verify_options(Value, erlang:system_info(otp_release)).
+
+ssl_verify_options(true, OTPVersion) when OTPVersion >= "R14" ->
+ CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
+ [{verify, verify_peer}, {cacertfile, CAFile}];
+ssl_verify_options(false, OTPVersion) when OTPVersion >= "R14" ->
+ [{verify, verify_none}];
+ssl_verify_options(true, _OTPVersion) ->
+ CAFile = couch_config:get("replicator", "ssl_trusted_certificates_file"),
+ [{verify, 2}, {cacertfile, CAFile}];
+ssl_verify_options(false, _OTPVersion) ->
+ [{verify, 0}].
+
+
+open_db(#db{name = Name, user_ctx = UserCtx, options = Options}) ->
+ {ok, Db} = couch_db:open(Name, [{user_ctx, UserCtx} | Options]),
+ Db;
+open_db(HttpDb) ->
+ HttpDb.
+
+
+close_db(#db{} = Db) ->
+ couch_db:close(Db);
+close_db(_HttpDb) ->
+ ok.
+
+
+start_db_compaction_notifier(#db{name = DbName}, Server) ->
+ {ok, Notifier} = couch_db_update_notifier:start_link(
+ fun({compacted, DbName1}) when DbName1 =:= DbName ->
+ ok = gen_server:cast(Server, {db_compacted, DbName});
+ (_) ->
+ ok
+ end),
+ Notifier;
+start_db_compaction_notifier(_, _) ->
+ nil.
+
+
+stop_db_compaction_notifier(nil) ->
+ ok;
+stop_db_compaction_notifier(Notifier) ->
+ couch_db_update_notifier:stop(Notifier).
+
+
+sum_stats(#rep_stats{} = S1, #rep_stats{} = S2) ->
+ #rep_stats{
+ missing_checked =
+ S1#rep_stats.missing_checked + S2#rep_stats.missing_checked,
+ missing_found = S1#rep_stats.missing_found + S2#rep_stats.missing_found,
+ docs_read = S1#rep_stats.docs_read + S2#rep_stats.docs_read,
+ docs_written = S1#rep_stats.docs_written + S2#rep_stats.docs_written,
+ doc_write_failures =
+ S1#rep_stats.doc_write_failures + S2#rep_stats.doc_write_failures
+ }.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/src/couch_replicator_worker.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator/src/couch_replicator_worker.erl b/src/couch_replicator/src/couch_replicator_worker.erl
new file mode 100644
index 0000000..aa04dc5
--- /dev/null
+++ b/src/couch_replicator/src/couch_replicator_worker.erl
@@ -0,0 +1,515 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_replicator_worker).
+-behaviour(gen_server).
+
+% public API
+-export([start_link/5]).
+
+% gen_server callbacks
+-export([init/1, terminate/2, code_change/3]).
+-export([handle_call/3, handle_cast/2, handle_info/2]).
+
+-include("couch_db.hrl").
+-include("couch_api_wrap.hrl").
+-include("couch_replicator.hrl").
+
+% TODO: maybe make both buffer max sizes configurable
+-define(DOC_BUFFER_BYTE_SIZE, 512 * 1024). % for remote targets
+-define(DOC_BUFFER_LEN, 10). % for local targets, # of documents
+-define(MAX_BULK_ATT_SIZE, 64 * 1024).
+-define(MAX_BULK_ATTS_PER_DOC, 8).
+-define(STATS_DELAY, 10000000). % 10 seconds (in microseconds)
+
+-define(inc_stat(StatPos, Stats, Inc),
+ setelement(StatPos, Stats, element(StatPos, Stats) + Inc)).
+
+-import(couch_replicator_utils, [
+ open_db/1,
+ close_db/1,
+ start_db_compaction_notifier/2,
+ stop_db_compaction_notifier/1
+]).
+-import(couch_util, [
+ to_binary/1,
+ get_value/2,
+ get_value/3
+]).
+
+
+-record(batch, {
+ docs = [],
+ size = 0
+}).
+
+-record(state, {
+ cp,
+ loop,
+ max_parallel_conns,
+ source,
+ target,
+ readers = [],
+ writer = nil,
+ pending_fetch = nil,
+ flush_waiter = nil,
+ stats = #rep_stats{},
+ source_db_compaction_notifier = nil,
+ target_db_compaction_notifier = nil,
+ batch = #batch{}
+}).
+
+
+
+start_link(Cp, #db{} = Source, Target, ChangesManager, _MaxConns) ->
+ Pid = spawn_link(fun() ->
+ erlang:put(last_stats_report, now()),
+ queue_fetch_loop(Source, Target, Cp, Cp, ChangesManager)
+ end),
+ {ok, Pid};
+
+start_link(Cp, Source, Target, ChangesManager, MaxConns) ->
+ gen_server:start_link(
+ ?MODULE, {Cp, Source, Target, ChangesManager, MaxConns}, []).
+
+
+init({Cp, Source, Target, ChangesManager, MaxConns}) ->
+ process_flag(trap_exit, true),
+ Parent = self(),
+ LoopPid = spawn_link(fun() ->
+ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager)
+ end),
+ erlang:put(last_stats_report, now()),
+ State = #state{
+ cp = Cp,
+ max_parallel_conns = MaxConns,
+ loop = LoopPid,
+ source = open_db(Source),
+ target = open_db(Target),
+ source_db_compaction_notifier =
+ start_db_compaction_notifier(Source, self()),
+ target_db_compaction_notifier =
+ start_db_compaction_notifier(Target, self())
+ },
+ {ok, State}.
+
+
+handle_call({fetch_doc, {_Id, _Revs, _PAs} = Params}, {Pid, _} = From,
+ #state{loop = Pid, readers = Readers, pending_fetch = nil,
+ source = Src, target = Tgt, max_parallel_conns = MaxConns} = State) ->
+ case length(Readers) of
+ Size when Size < MaxConns ->
+ Reader = spawn_doc_reader(Src, Tgt, Params),
+ NewState = State#state{
+ readers = [Reader | Readers]
+ },
+ {reply, ok, NewState};
+ _ ->
+ NewState = State#state{
+ pending_fetch = {From, Params}
+ },
+ {noreply, NewState}
+ end;
+
+handle_call({batch_doc, Doc}, From, State) ->
+ gen_server:reply(From, ok),
+ {noreply, maybe_flush_docs(Doc, State)};
+
+handle_call({add_stats, IncStats}, From, #state{stats = Stats} = State) ->
+ gen_server:reply(From, ok),
+ NewStats = couch_replicator_utils:sum_stats(Stats, IncStats),
+ NewStats2 = maybe_report_stats(State#state.cp, NewStats),
+ {noreply, State#state{stats = NewStats2}};
+
+handle_call(flush, {Pid, _} = From,
+ #state{loop = Pid, writer = nil, flush_waiter = nil,
+ target = Target, batch = Batch} = State) ->
+ State2 = case State#state.readers of
+ [] ->
+ State#state{writer = spawn_writer(Target, Batch)};
+ _ ->
+ State
+ end,
+ {noreply, State2#state{flush_waiter = From}}.
+
+
+handle_cast({db_compacted, DbName},
+ #state{source = #db{name = DbName} = Source} = State) ->
+ {ok, NewSource} = couch_db:reopen(Source),
+ {noreply, State#state{source = NewSource}};
+
+handle_cast({db_compacted, DbName},
+ #state{target = #db{name = DbName} = Target} = State) ->
+ {ok, NewTarget} = couch_db:reopen(Target),
+ {noreply, State#state{target = NewTarget}};
+
+handle_cast(Msg, State) ->
+ {stop, {unexpected_async_call, Msg}, State}.
+
+
+handle_info({'EXIT', Pid, normal}, #state{loop = Pid} = State) ->
+ #state{
+ batch = #batch{docs = []}, readers = [], writer = nil,
+ pending_fetch = nil, flush_waiter = nil
+ } = State,
+ {stop, normal, State};
+
+handle_info({'EXIT', Pid, normal}, #state{writer = Pid} = State) ->
+ {noreply, after_full_flush(State)};
+
+handle_info({'EXIT', Pid, normal}, #state{writer = nil} = State) ->
+ #state{
+ readers = Readers, writer = Writer, batch = Batch,
+ source = Source, target = Target,
+ pending_fetch = Fetch, flush_waiter = FlushWaiter
+ } = State,
+ case Readers -- [Pid] of
+ Readers ->
+ {noreply, State};
+ Readers2 ->
+ State2 = case Fetch of
+ nil ->
+ case (FlushWaiter =/= nil) andalso (Writer =:= nil) andalso
+ (Readers2 =:= []) of
+ true ->
+ State#state{
+ readers = Readers2,
+ writer = spawn_writer(Target, Batch)
+ };
+ false ->
+ State#state{readers = Readers2}
+ end;
+ {From, FetchParams} ->
+ Reader = spawn_doc_reader(Source, Target, FetchParams),
+ gen_server:reply(From, ok),
+ State#state{
+ readers = [Reader | Readers2],
+ pending_fetch = nil
+ }
+ end,
+ {noreply, State2}
+ end;
+
+handle_info({'EXIT', Pid, Reason}, State) ->
+ {stop, {process_died, Pid, Reason}, State}.
+
+
+terminate(_Reason, State) ->
+ close_db(State#state.source),
+ close_db(State#state.target),
+ stop_db_compaction_notifier(State#state.source_db_compaction_notifier),
+ stop_db_compaction_notifier(State#state.target_db_compaction_notifier).
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager) ->
+ ChangesManager ! {get_changes, self()},
+ receive
+ {closed, ChangesManager} ->
+ ok;
+ {changes, ChangesManager, Changes, ReportSeq} ->
+ Target2 = open_db(Target),
+ {IdRevs, Stats0} = find_missing(Changes, Target2),
+ case Source of
+ #db{} ->
+ Source2 = open_db(Source),
+ Stats = local_process_batch(
+ IdRevs, Cp, Source2, Target2, #batch{}, Stats0),
+ close_db(Source2);
+ #httpdb{} ->
+ ok = gen_server:call(Parent, {add_stats, Stats0}, infinity),
+ remote_process_batch(IdRevs, Parent),
+ {ok, Stats} = gen_server:call(Parent, flush, infinity)
+ end,
+ close_db(Target2),
+ ok = gen_server:call(Cp, {report_seq_done, ReportSeq, Stats}, infinity),
+ erlang:put(last_stats_report, now()),
+ ?LOG_DEBUG("Worker reported completion of seq ~p", [ReportSeq]),
+ queue_fetch_loop(Source, Target, Parent, Cp, ChangesManager)
+ end.
+
+
+local_process_batch([], _Cp, _Src, _Tgt, #batch{docs = []}, Stats) ->
+ Stats;
+
+local_process_batch([], Cp, Source, Target, #batch{docs = Docs, size = Size}, Stats) ->
+ case Target of
+ #httpdb{} ->
+ ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]);
+ #db{} ->
+ ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size])
+ end,
+ Stats2 = flush_docs(Target, Docs),
+ Stats3 = couch_replicator_utils:sum_stats(Stats, Stats2),
+ local_process_batch([], Cp, Source, Target, #batch{}, Stats3);
+
+local_process_batch([IdRevs | Rest], Cp, Source, Target, Batch, Stats) ->
+ {ok, {_, DocList, Stats2, _}} = fetch_doc(
+ Source, IdRevs, fun local_doc_handler/2, {Target, [], Stats, Cp}),
+ {Batch2, Stats3} = lists:foldl(
+ fun(Doc, {Batch0, Stats0}) ->
+ {Batch1, S} = maybe_flush_docs(Target, Batch0, Doc),
+ {Batch1, couch_replicator_utils:sum_stats(Stats0, S)}
+ end,
+ {Batch, Stats2}, DocList),
+ local_process_batch(Rest, Cp, Source, Target, Batch2, Stats3).
+
+
+remote_process_batch([], _Parent) ->
+ ok;
+
+remote_process_batch([{Id, Revs, PAs} | Rest], Parent) ->
+ % When the source is a remote database, we fetch a single document revision
+ % per HTTP request. This is mostly to facilitate retrying of HTTP requests
+ % due to network transient failures. It also helps not exceeding the maximum
+ % URL length allowed by proxies and Mochiweb.
+ lists:foreach(
+ fun(Rev) ->
+ ok = gen_server:call(Parent, {fetch_doc, {Id, [Rev], PAs}}, infinity)
+ end,
+ Revs),
+ remote_process_batch(Rest, Parent).
+
+
+spawn_doc_reader(Source, Target, FetchParams) ->
+ Parent = self(),
+ spawn_link(fun() ->
+ Source2 = open_db(Source),
+ fetch_doc(
+ Source2, FetchParams, fun remote_doc_handler/2, {Parent, Target}),
+ close_db(Source2)
+ end).
+
+
+fetch_doc(Source, {Id, Revs, PAs}, DocHandler, Acc) ->
+ try
+ couch_api_wrap:open_doc_revs(
+ Source, Id, Revs, [{atts_since, PAs}], DocHandler, Acc)
+ catch
+ throw:{missing_stub, _} ->
+ ?LOG_ERROR("Retrying fetch and update of document `~p` due to out of "
+ "sync attachment stubs. Missing revisions are: ~s",
+ [Id, couch_doc:revs_to_strs(Revs)]),
+ couch_api_wrap:open_doc_revs(Source, Id, Revs, [], DocHandler, Acc)
+ end.
+
+
+local_doc_handler({ok, Doc}, {Target, DocList, Stats, Cp}) ->
+ Stats2 = ?inc_stat(#rep_stats.docs_read, Stats, 1),
+ case batch_doc(Doc) of
+ true ->
+ {ok, {Target, [Doc | DocList], Stats2, Cp}};
+ false ->
+ ?LOG_DEBUG("Worker flushing doc with attachments", []),
+ Target2 = open_db(Target),
+ Success = (flush_doc(Target2, Doc) =:= ok),
+ close_db(Target2),
+ Stats3 = case Success of
+ true ->
+ ?inc_stat(#rep_stats.docs_written, Stats2, 1);
+ false ->
+ ?inc_stat(#rep_stats.doc_write_failures, Stats2, 1)
+ end,
+ Stats4 = maybe_report_stats(Cp, Stats3),
+ {ok, {Target, DocList, Stats4, Cp}}
+ end;
+local_doc_handler(_, Acc) ->
+ {ok, Acc}.
+
+
+remote_doc_handler({ok, #doc{atts = []} = Doc}, {Parent, _} = Acc) ->
+ ok = gen_server:call(Parent, {batch_doc, Doc}, infinity),
+ {ok, Acc};
+remote_doc_handler({ok, Doc}, {Parent, Target} = Acc) ->
+ % Immediately flush documents with attachments received from a remote
+ % source. The data property of each attachment is a function that starts
+ % streaming the attachment data from the remote source, therefore it's
+ % convenient to call it ASAP to avoid ibrowse inactivity timeouts.
+ Stats = #rep_stats{docs_read = 1},
+ ?LOG_DEBUG("Worker flushing doc with attachments", []),
+ Target2 = open_db(Target),
+ Success = (flush_doc(Target2, Doc) =:= ok),
+ close_db(Target2),
+ {Result, Stats2} = case Success of
+ true ->
+ {{ok, Acc}, ?inc_stat(#rep_stats.docs_written, Stats, 1)};
+ false ->
+ {{skip, Acc}, ?inc_stat(#rep_stats.doc_write_failures, Stats, 1)}
+ end,
+ ok = gen_server:call(Parent, {add_stats, Stats2}, infinity),
+ Result;
+remote_doc_handler(_, Acc) ->
+ {ok, Acc}.
+
+
+spawn_writer(Target, #batch{docs = DocList, size = Size}) ->
+ case {Target, Size > 0} of
+ {#httpdb{}, true} ->
+ ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]);
+ {#db{}, true} ->
+ ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size]);
+ _ ->
+ ok
+ end,
+ Parent = self(),
+ spawn_link(
+ fun() ->
+ Target2 = open_db(Target),
+ Stats = flush_docs(Target2, DocList),
+ close_db(Target2),
+ ok = gen_server:call(Parent, {add_stats, Stats}, infinity)
+ end).
+
+
+after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) ->
+ gen_server:reply(Waiter, {ok, Stats}),
+ erlang:put(last_stats_report, now()),
+ State#state{
+ stats = #rep_stats{},
+ flush_waiter = nil,
+ writer = nil,
+ batch = #batch{}
+ }.
+
+
+maybe_flush_docs(Doc,State) ->
+ #state{
+ target = Target, batch = Batch,
+ stats = Stats, cp = Cp
+ } = State,
+ {Batch2, WStats} = maybe_flush_docs(Target, Batch, Doc),
+ Stats2 = couch_replicator_utils:sum_stats(Stats, WStats),
+ Stats3 = ?inc_stat(#rep_stats.docs_read, Stats2, 1),
+ Stats4 = maybe_report_stats(Cp, Stats3),
+ State#state{stats = Stats4, batch = Batch2}.
+
+
+maybe_flush_docs(#httpdb{} = Target, Batch, Doc) ->
+ #batch{docs = DocAcc, size = SizeAcc} = Batch,
+ case batch_doc(Doc) of
+ false ->
+ ?LOG_DEBUG("Worker flushing doc with attachments", []),
+ case flush_doc(Target, Doc) of
+ ok ->
+ {Batch, #rep_stats{docs_written = 1}};
+ _ ->
+ {Batch, #rep_stats{doc_write_failures = 1}}
+ end;
+ true ->
+ JsonDoc = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])),
+ case SizeAcc + iolist_size(JsonDoc) of
+ SizeAcc2 when SizeAcc2 > ?DOC_BUFFER_BYTE_SIZE ->
+ ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [SizeAcc2]),
+ Stats = flush_docs(Target, [JsonDoc | DocAcc]),
+ {#batch{}, Stats};
+ SizeAcc2 ->
+ {#batch{docs = [JsonDoc | DocAcc], size = SizeAcc2}, #rep_stats{}}
+ end
+ end;
+
+maybe_flush_docs(#db{} = Target, #batch{docs = DocAcc, size = SizeAcc}, Doc) ->
+ case SizeAcc + 1 of
+ SizeAcc2 when SizeAcc2 >= ?DOC_BUFFER_LEN ->
+ ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [SizeAcc2]),
+ Stats = flush_docs(Target, [Doc | DocAcc]),
+ {#batch{}, Stats};
+ SizeAcc2 ->
+ {#batch{docs = [Doc | DocAcc], size = SizeAcc2}, #rep_stats{}}
+ end.
+
+
+batch_doc(#doc{atts = []}) ->
+ true;
+batch_doc(#doc{atts = Atts}) ->
+ (length(Atts) =< ?MAX_BULK_ATTS_PER_DOC) andalso
+ lists:all(
+ fun(#att{disk_len = L, data = Data}) ->
+ (L =< ?MAX_BULK_ATT_SIZE) andalso (Data =/= stub)
+ end, Atts).
+
+
+flush_docs(_Target, []) ->
+ #rep_stats{};
+
+flush_docs(Target, DocList) ->
+ {ok, Errors} = couch_api_wrap:update_docs(
+ Target, DocList, [delay_commit], replicated_changes),
+ DbUri = couch_api_wrap:db_uri(Target),
+ lists:foreach(
+ fun({Props}) ->
+ ?LOG_ERROR("Replicator: couldn't write document `~s`, revision `~s`,"
+ " to target database `~s`. Error: `~s`, reason: `~s`.",
+ [get_value(id, Props, ""), get_value(rev, Props, ""), DbUri,
+ get_value(error, Props, ""), get_value(reason, Props, "")])
+ end, Errors),
+ #rep_stats{
+ docs_written = length(DocList) - length(Errors),
+ doc_write_failures = length(Errors)
+ }.
+
+flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
+ try couch_api_wrap:update_doc(Target, Doc, [], replicated_changes) of
+ {ok, _} ->
+ ok;
+ Error ->
+ ?LOG_ERROR("Replicator: error writing document `~s` to `~s`: ~s",
+ [Id, couch_api_wrap:db_uri(Target), couch_util:to_binary(Error)]),
+ Error
+ catch
+ throw:{missing_stub, _} = MissingStub ->
+ throw(MissingStub);
+ throw:{Error, Reason} ->
+ ?LOG_ERROR("Replicator: couldn't write document `~s`, revision `~s`,"
+ " to target database `~s`. Error: `~s`, reason: `~s`.",
+ [Id, couch_doc:rev_to_str({Pos, RevId}),
+ couch_api_wrap:db_uri(Target), to_binary(Error), to_binary(Reason)]),
+ {error, Error};
+ throw:Err ->
+ ?LOG_ERROR("Replicator: couldn't write document `~s`, revision `~s`,"
+ " to target database `~s`. Error: `~s`.",
+ [Id, couch_doc:rev_to_str({Pos, RevId}),
+ couch_api_wrap:db_uri(Target), to_binary(Err)]),
+ {error, Err}
+ end.
+
+
+find_missing(DocInfos, Target) ->
+ {IdRevs, AllRevsCount} = lists:foldr(
+ fun(#doc_info{id = Id, revs = RevsInfo}, {IdRevAcc, CountAcc}) ->
+ Revs = [Rev || #rev_info{rev = Rev} <- RevsInfo],
+ {[{Id, Revs} | IdRevAcc], CountAcc + length(Revs)}
+ end,
+ {[], 0}, DocInfos),
+ {ok, Missing} = couch_api_wrap:get_missing_revs(Target, IdRevs),
+ MissingRevsCount = lists:foldl(
+ fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end,
+ 0, Missing),
+ Stats = #rep_stats{
+ missing_checked = AllRevsCount,
+ missing_found = MissingRevsCount
+ },
+ {Missing, Stats}.
+
+
+maybe_report_stats(Cp, Stats) ->
+ Now = now(),
+ case timer:now_diff(erlang:get(last_stats_report), Now) >= ?STATS_DELAY of
+ true ->
+ ok = gen_server:call(Cp, {add_stats, Stats}, infinity),
+ erlang:put(last_stats_report, Now),
+ #rep_stats{};
+ false ->
+ Stats
+ end.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/ad526790/src/couch_replicator/test/001-httpc-pool.t
----------------------------------------------------------------------
diff --git a/src/couch_replicator/test/001-httpc-pool.t b/src/couch_replicator/test/001-httpc-pool.t
new file mode 100755
index 0000000..90adcca
--- /dev/null
+++ b/src/couch_replicator/test/001-httpc-pool.t
@@ -0,0 +1,250 @@
+#!/usr/bin/env escript
+%% -*- erlang -*-
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+main(_) ->
+ test_util:init_code_path(),
+
+ etap:plan(55),
+ case (catch test()) of
+ ok ->
+ etap:end_tests();
+ Other ->
+ etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
+ etap:bail(Other)
+ end,
+ ok.
+
+
+test() ->
+ couch_server_sup:start_link(test_util:config_files()),
+ ibrowse:start(),
+
+ test_pool_full(),
+ test_worker_dead_pool_non_full(),
+ test_worker_dead_pool_full(),
+
+ couch_server_sup:stop(),
+ ok.
+
+
+test_pool_full() ->
+ Pool = spawn_pool(),
+ Client1 = spawn_client(Pool),
+ Client2 = spawn_client(Pool),
+ Client3 = spawn_client(Pool),
+
+ etap:diag("Check that we can spawn the max number of connections."),
+ etap:is(ping_client(Client1), ok, "Client 1 started ok."),
+ etap:is(ping_client(Client2), ok, "Client 2 started ok."),
+ etap:is(ping_client(Client3), ok, "Client 3 started ok."),
+
+ Worker1 = get_client_worker(Client1, "1"),
+ Worker2 = get_client_worker(Client2, "2"),
+ Worker3 = get_client_worker(Client3, "3"),
+ etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
+ etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
+ etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."),
+
+ etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."),
+ etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."),
+ etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."),
+
+ etap:diag("Check that client 4 blocks waiting for a worker."),
+ Client4 = spawn_client(Pool),
+ etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."),
+
+ etap:diag("Check that stopping a client gives up its worker."),
+ etap:is(stop_client(Client1), ok, "First client stopped."),
+
+ etap:diag("And check that our blocked client has been unblocked."),
+ etap:is(ping_client(Client4), ok, "Client 4 was unblocked."),
+
+ Worker4 = get_client_worker(Client4, "4"),
+ etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."),
+ etap:is(Worker4, Worker1, "Client 4 got worker that client 1 got before."),
+
+ lists:foreach(fun(C) -> ok = stop_client(C) end, [Client2, Client3, Client4]),
+ stop_pool(Pool).
+
+
+test_worker_dead_pool_non_full() ->
+ Pool = spawn_pool(),
+ Client1 = spawn_client(Pool),
+
+ etap:is(ping_client(Client1), ok, "Client 1 started ok."),
+ Worker1 = get_client_worker(Client1, "1"),
+ etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
+
+ etap:diag("Kill client's 1 worker."),
+ etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."),
+ etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."),
+
+ etap:is(stop_client(Client1), ok, "First client stopped and released its worker."),
+
+ Client2 = spawn_client(Pool),
+ etap:is(ping_client(Client2), ok, "Client 2 started ok."),
+ Worker2 = get_client_worker(Client2, "2"),
+ etap:isnt(Worker2, Worker1, "Client 2 got a different worker from client 1"),
+ etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
+
+ etap:is(stop_client(Client2), ok, "Second client stopped."),
+ stop_pool(Pool).
+
+
+test_worker_dead_pool_full() ->
+ Pool = spawn_pool(),
+ Client1 = spawn_client(Pool),
+ Client2 = spawn_client(Pool),
+ Client3 = spawn_client(Pool),
+
+ etap:diag("Check that we can spawn the max number of connections."),
+ etap:is(ping_client(Client1), ok, "Client 1 started ok."),
+ etap:is(ping_client(Client2), ok, "Client 2 started ok."),
+ etap:is(ping_client(Client3), ok, "Client 3 started ok."),
+
+ Worker1 = get_client_worker(Client1, "1"),
+ Worker2 = get_client_worker(Client2, "2"),
+ Worker3 = get_client_worker(Client3, "3"),
+ etap:is(is_process_alive(Worker1), true, "Client's 1 worker is alive."),
+ etap:is(is_process_alive(Worker2), true, "Client's 2 worker is alive."),
+ etap:is(is_process_alive(Worker3), true, "Client's 3 worker is alive."),
+
+ etap:isnt(Worker1, Worker2, "Clients 1 and 2 got different workers."),
+ etap:isnt(Worker2, Worker3, "Clients 2 and 3 got different workers."),
+ etap:isnt(Worker1, Worker3, "Clients 1 and 3 got different workers."),
+
+ etap:diag("Check that client 4 blocks waiting for a worker."),
+ Client4 = spawn_client(Pool),
+ etap:is(ping_client(Client4), timeout, "Client 4 blocked while waiting."),
+
+ etap:diag("Kill client's 1 worker."),
+ etap:is(kill_client_worker(Client1), ok, "Killed client's 1 worker."),
+ etap:is(is_process_alive(Worker1), false, "Client's 1 worker process is dead."),
+
+ etap:diag("Check client 4 got unblocked after first worker's death"),
+ etap:is(ping_client(Client4), ok, "Client 4 not blocked anymore."),
+
+ Worker4 = get_client_worker(Client4, "4"),
+ etap:is(is_process_alive(Worker4), true, "Client's 4 worker is alive."),
+ etap:isnt(Worker4, Worker1, "Client 4 got a worker different from client 1."),
+ etap:isnt(Worker4, Worker2, "Client 4 got a worker different from client 2."),
+ etap:isnt(Worker4, Worker3, "Client 4 got a worker different from client 3."),
+
+ etap:diag("Check that stopping client 1 is a noop."),
+ etap:is(stop_client(Client1), ok, "First client stopped."),
+
+ etap:is(is_process_alive(Worker2), true, "Client's 2 worker still alive."),
+ etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."),
+ etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."),
+
+ etap:diag("Check that client 5 blocks waiting for a worker."),
+ Client5 = spawn_client(Pool),
+ etap:is(ping_client(Client5), timeout, "Client 5 blocked while waiting."),
+
+ etap:diag("Check that stopping client 2 gives up its worker."),
+ etap:is(stop_client(Client2), ok, "Second client stopped."),
+
+ etap:diag("Now check that client 5 has been unblocked."),
+ etap:is(ping_client(Client5), ok, "Client 5 was unblocked."),
+
+ Worker5 = get_client_worker(Client5, "5"),
+ etap:is(is_process_alive(Worker5), true, "Client's 5 worker is alive."),
+ etap:isnt(Worker5, Worker1, "Client 5 got a worker different from client 1."),
+ etap:is(Worker5, Worker2, "Client 5 got same worker as client 2."),
+ etap:isnt(Worker5, Worker3, "Client 5 got a worker different from client 3."),
+ etap:isnt(Worker5, Worker4, "Client 5 got a worker different from client 4."),
+
+ etap:is(is_process_alive(Worker3), true, "Client's 3 worker still alive."),
+ etap:is(is_process_alive(Worker4), true, "Client's 4 worker still alive."),
+ etap:is(is_process_alive(Worker5), true, "Client's 5 worker still alive."),
+
+ lists:foreach(fun(C) -> ok = stop_client(C) end, [Client3, Client4, Client5]),
+ stop_pool(Pool).
+
+
+spawn_client(Pool) ->
+ Parent = self(),
+ Ref = make_ref(),
+ Pid = spawn(fun() ->
+ {ok, Worker} = couch_httpc_pool:get_worker(Pool),
+ loop(Parent, Ref, Worker, Pool)
+ end),
+ {Pid, Ref}.
+
+
+ping_client({Pid, Ref}) ->
+ Pid ! ping,
+ receive
+ {pong, Ref} ->
+ ok
+ after 3000 ->
+ timeout
+ end.
+
+
+get_client_worker({Pid, Ref}, ClientName) ->
+ Pid ! get_worker,
+ receive
+ {worker, Ref, Worker} ->
+ Worker
+ after 3000 ->
+ etap:bail("Timeout getting client " ++ ClientName ++ " worker.")
+ end.
+
+
+stop_client({Pid, Ref}) ->
+ Pid ! stop,
+ receive
+ {stop, Ref} ->
+ ok
+ after 3000 ->
+ timeout
+ end.
+
+
+kill_client_worker({Pid, Ref}) ->
+ Pid ! get_worker,
+ receive
+ {worker, Ref, Worker} ->
+ exit(Worker, kill),
+ ok
+ after 3000 ->
+ timeout
+ end.
+
+
+loop(Parent, Ref, Worker, Pool) ->
+ receive
+ ping ->
+ Parent ! {pong, Ref},
+ loop(Parent, Ref, Worker, Pool);
+ get_worker ->
+ Parent ! {worker, Ref, Worker},
+ loop(Parent, Ref, Worker, Pool);
+ stop ->
+ couch_httpc_pool:release_worker(Pool, Worker),
+ Parent ! {stop, Ref}
+ end.
+
+
+spawn_pool() ->
+ Host = couch_config:get("httpd", "bind_address", "127.0.0.1"),
+ Port = couch_config:get("httpd", "port", "5984"),
+ {ok, Pool} = couch_httpc_pool:start_link(
+ "http://" ++ Host ++ ":5984", [{max_connections, 3}]),
+ Pool.
+
+
+stop_pool(Pool) ->
+ ok = couch_httpc_pool:stop(Pool).