You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by be...@apache.org on 2014/02/13 19:12:45 UTC
[47/57] [abbrv] couchdb commit: updated
refs/heads/1994-merge-rcouch-multi-repo to b19d055
remove couch_replicator
Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/4c39d7a5
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/4c39d7a5
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/4c39d7a5
Branch: refs/heads/1994-merge-rcouch-multi-repo
Commit: 4c39d7a58ce3f3af8ffa6c07fa2a973ae8de15f9
Parents: 20a68fe
Author: Benoit Chesneau <bc...@gmail.com>
Authored: Thu Feb 13 16:39:13 2014 +0100
Committer: Benoit Chesneau <bc...@gmail.com>
Committed: Thu Feb 13 16:39:13 2014 +0100
----------------------------------------------------------------------
.../src/couch_replicator.app.src | 28 -
apps/couch_replicator/src/couch_replicator.erl | 996 -------------------
apps/couch_replicator/src/couch_replicator.hrl | 32 -
.../src/couch_replicator_api_wrap.erl | 900 -----------------
.../src/couch_replicator_api_wrap.hrl | 36 -
.../src/couch_replicator_app.erl | 29 -
.../src/couch_replicator_httpc.erl | 297 ------
.../src/couch_replicator_httpc_pool.erl | 138 ---
.../src/couch_replicator_httpd.erl | 66 --
.../src/couch_replicator_job_sup.erl | 31 -
.../src/couch_replicator_js_functions.hrl | 151 ---
.../src/couch_replicator_manager.erl | 709 -------------
.../src/couch_replicator_manager_sup.erl | 47 -
.../src/couch_replicator_notifier.erl | 57 --
.../src/couch_replicator_sup.erl | 63 --
.../src/couch_replicator_utils.erl | 419 --------
.../src/couch_replicator_worker.erl | 515 ----------
apps/couch_replicator/test/01-load.t | 38 -
apps/couch_replicator/test/02-httpc-pool.t | 249 -----
.../test/03-replication-compact.t | 489 ---------
.../test/04-replication-large-atts.t | 266 -----
.../test/05-replication-many-leaves.t | 295 ------
.../test/06-doc-missing-stubs.t | 303 ------
apps/couch_replicator/test/07-use-checkpoints.t | 255 -----
24 files changed, 6409 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb/blob/4c39d7a5/apps/couch_replicator/src/couch_replicator.app.src
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator.app.src b/apps/couch_replicator/src/couch_replicator.app.src
deleted file mode 100644
index 3e71b3b..0000000
--- a/apps/couch_replicator/src/couch_replicator.app.src
+++ /dev/null
@@ -1,28 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-{application, couch_replicator, [
- {description, "CouchDB replicator"},
- {vsn, "1.6.1"},
- {modules, []},
- {registered, [
- couch_replicator_manager_sup,
- couch_replicator_job_sup,
- couch_replicator_sup
- ]},
- {applications, [kernel, stdlib, crypto, sasl, inets, oauth, ibrowse,
- couch]},
-
- {mod, { couch_replicator_app, []}},
- {env, []}
-]}.
-
http://git-wip-us.apache.org/repos/asf/couchdb/blob/4c39d7a5/apps/couch_replicator/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator.erl b/apps/couch_replicator/src/couch_replicator.erl
deleted file mode 100644
index 10aaf37..0000000
--- a/apps/couch_replicator/src/couch_replicator.erl
+++ /dev/null
@@ -1,996 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator).
--behaviour(gen_server).
-
-% public API
--export([replicate/1]).
-
-% meant to be used only by the replicator database listener
--export([async_replicate/1]).
--export([cancel_replication/1]).
-
-% gen_server callbacks
--export([init/1, terminate/2, code_change/3]).
--export([handle_call/3, handle_cast/2, handle_info/2]).
-
--include_lib("couch/include/couch_db.hrl").
--include("couch_replicator_api_wrap.hrl").
--include("couch_replicator.hrl").
-
--import(couch_util, [
- get_value/2,
- get_value/3,
- to_binary/1
-]).
-
--import(couch_replicator_utils, [
- start_db_compaction_notifier/2,
- stop_db_compaction_notifier/1
-]).
-
--record(rep_state, {
- rep_details,
- source_name,
- target_name,
- source,
- target,
- history,
- checkpoint_history,
- start_seq,
- committed_seq,
- current_through_seq,
- seqs_in_progress = [],
- highest_seq_done = {0, ?LOWEST_SEQ},
- source_log,
- target_log,
- rep_starttime,
- src_starttime,
- tgt_starttime,
- timer, % checkpoint timer
- changes_queue,
- changes_manager,
- changes_reader,
- workers,
- stats = #rep_stats{},
- session_id,
- source_db_compaction_notifier = nil,
- target_db_compaction_notifier = nil,
- source_monitor = nil,
- target_monitor = nil,
- source_seq = nil,
- use_checkpoints = true,
- checkpoint_interval = 5000,
- type = db,
- view = nil
-}).
-
-
-replicate(#rep{id = RepId, options = Options, user_ctx = UserCtx} = Rep) ->
- case get_value(cancel, Options, false) of
- true ->
- case get_value(id, Options, nil) of
- nil ->
- cancel_replication(RepId);
- RepId2 ->
- cancel_replication(RepId2, UserCtx)
- end;
- false ->
- {ok, Listener} = rep_result_listener(RepId),
- Result = do_replication_loop(Rep),
- couch_replicator_notifier:stop(Listener),
- Result
- end.
-
-
-do_replication_loop(#rep{id = {BaseId, Ext} = Id, options = Options} = Rep) ->
- case async_replicate(Rep) of
- {ok, _Pid} ->
- case get_value(continuous, Options, false) of
- true ->
- {ok, {continuous, ?l2b(BaseId ++ Ext)}};
- false ->
- wait_for_result(Id)
- end;
- Error ->
- Error
- end.
-
-
-async_replicate(#rep{id = {BaseId, Ext}, source = Src, target = Tgt} = Rep) ->
- RepChildId = BaseId ++ Ext,
- Source = couch_replicator_api_wrap:db_uri(Src),
- Target = couch_replicator_api_wrap:db_uri(Tgt),
- Timeout = get_value(connection_timeout, Rep#rep.options),
- ChildSpec = {
- RepChildId,
- {gen_server, start_link, [?MODULE, Rep, [{timeout, Timeout}]]},
- temporary,
- 250,
- worker,
- [?MODULE]
- },
- % All these nested cases to attempt starting/restarting a replication child
- % are ugly and not 100% race condition free. The following patch submission
- % is a solution:
- %
- % http://erlang.2086793.n4.nabble.com/PATCH-supervisor-atomically-delete-child-spec-when-child-terminates-td3226098.html
- %
- case supervisor:start_child(couch_replicator_job_sup, ChildSpec) of
- {ok, Pid} ->
- ?LOG_INFO("starting new replication `~s` at ~p (`~s` -> `~s`)",
- [RepChildId, Pid, Source, Target]),
- {ok, Pid};
- {error, already_present} ->
- case supervisor:restart_child(couch_replicator_job_sup, RepChildId) of
- {ok, Pid} ->
- ?LOG_INFO("restarting replication `~s` at ~p (`~s` -> `~s`)",
- [RepChildId, Pid, Source, Target]),
- {ok, Pid};
- {error, running} ->
- %% this error occurs if multiple replicators are racing
- %% each other to start and somebody else won. Just grab
- %% the Pid by calling start_child again.
- {error, {already_started, Pid}} =
- supervisor:start_child(couch_replicator_job_sup, ChildSpec),
- ?LOG_INFO("replication `~s` already running at ~p (`~s` -> `~s`)",
- [RepChildId, Pid, Source, Target]),
- {ok, Pid};
- {error, {'EXIT', {badarg,
- [{erlang, apply, [gen_server, start_link, undefined]} | _]}}} ->
- % Clause to deal with a change in the supervisor module introduced
- % in R14B02. For more details consult the thread at:
- % http://erlang.org/pipermail/erlang-bugs/2011-March/002273.html
- _ = supervisor:delete_child(couch_replicator_job_sup, RepChildId),
- async_replicate(Rep);
- {error, _} = Error ->
- Error
- end;
- {error, {already_started, Pid}} ->
- ?LOG_INFO("replication `~s` already running at ~p (`~s` -> `~s`)",
- [RepChildId, Pid, Source, Target]),
- {ok, Pid};
- {error, {Error, _}} ->
- {error, Error}
- end.
-
-
-rep_result_listener(RepId) ->
- ReplyTo = self(),
- {ok, _Listener} = couch_replicator_notifier:start_link(
- fun({_, RepId2, _} = Ev) when RepId2 =:= RepId ->
- ReplyTo ! Ev;
- (_) ->
- ok
- end).
-
-
-wait_for_result(RepId) ->
- receive
- {finished, RepId, RepResult} ->
- {ok, RepResult};
- {error, RepId, Reason} ->
- {error, Reason}
- end.
-
-
-cancel_replication({BaseId, Extension}) ->
- FullRepId = BaseId ++ Extension,
- ?LOG_INFO("Canceling replication `~s`...", [FullRepId]),
- case supervisor:terminate_child(couch_replicator_job_sup, FullRepId) of
- ok ->
- ?LOG_INFO("Replication `~s` canceled.", [FullRepId]),
- case supervisor:delete_child(couch_replicator_job_sup, FullRepId) of
- ok ->
- {ok, {cancelled, ?l2b(FullRepId)}};
- {error, not_found} ->
- {ok, {cancelled, ?l2b(FullRepId)}};
- Error ->
- Error
- end;
- Error ->
- ?LOG_ERROR("Error canceling replication `~s`: ~p", [FullRepId, Error]),
- Error
- end.
-
-cancel_replication(RepId, #user_ctx{name = Name, roles = Roles}) ->
- case lists:member(<<"_admin">>, Roles) of
- true ->
- cancel_replication(RepId);
- false ->
- {BaseId, Ext} = RepId,
- case lists:keysearch(
- BaseId ++ Ext, 1, supervisor:which_children(couch_replicator_job_sup)) of
- {value, {_, Pid, _, _}} when is_pid(Pid) ->
- case (catch gen_server:call(Pid, get_details, infinity)) of
- {ok, #rep{user_ctx = #user_ctx{name = Name}}} ->
- cancel_replication(RepId);
- {ok, _} ->
- throw({unauthorized,
- <<"Can't cancel a replication triggered by another user">>});
- {'EXIT', {noproc, {gen_server, call, _}}} ->
- {error, not_found};
- Error ->
- throw(Error)
- end;
- _ ->
- {error, not_found}
- end
- end.
-
-init(InitArgs) ->
- try
- do_init(InitArgs)
- catch
- throw:{unauthorized, DbUri} ->
- {stop, {unauthorized,
- <<"unauthorized to access or create database ", DbUri/binary>>}};
- throw:{db_not_found, DbUri} ->
- {stop, {db_not_found, <<"could not open ", DbUri/binary>>}};
- throw:Error ->
- {stop, Error}
- end.
-
-do_init(#rep{options = Options, id = {BaseId, Ext}} = Rep) ->
- process_flag(trap_exit, true),
-
- #rep_state{
- source = Source,
- target = Target,
- source_name = SourceName,
- target_name = TargetName,
- start_seq = {_Ts, StartSeq},
- source_seq = SourceCurSeq,
- committed_seq = {_, CommittedSeq},
- checkpoint_interval = CheckpointInterval
- } = State = init_state(Rep),
-
- NumWorkers = get_value(worker_processes, Options),
- BatchSize = get_value(worker_batch_size, Options),
- {ok, ChangesQueue} = couch_work_queue:new([
- {max_items, BatchSize * NumWorkers * 2},
- {max_size, 100 * 1024 * NumWorkers}
- ]),
- % This starts the _changes reader process. It adds the changes from
- % the source db to the ChangesQueue.
- ChangesReader = spawn_changes_reader(StartSeq, Source, ChangesQueue, Options),
- % Changes manager - responsible for dequeing batches from the changes queue
- % and deliver them to the worker processes.
- ChangesManager = spawn_changes_manager(self(), ChangesQueue, BatchSize),
- % This starts the worker processes. They ask the changes queue manager for a
- % a batch of _changes rows to process -> check which revs are missing in the
- % target, and for the missing ones, it copies them from the source to the target.
- MaxConns = get_value(http_connections, Options),
- Workers = lists:map(
- fun(_) ->
- {ok, Pid} = couch_replicator_worker:start_link(
- self(), Source, Target, ChangesManager, MaxConns),
- Pid
- end,
- lists:seq(1, NumWorkers)),
-
- couch_task_status:add_task([
- {type, replication},
- {replication_id, ?l2b(BaseId ++ Ext)},
- {doc_id, Rep#rep.doc_id},
- {source, ?l2b(SourceName)},
- {target, ?l2b(TargetName)},
- {continuous, get_value(continuous, Options, false)},
- {revisions_checked, 0},
- {missing_revisions_found, 0},
- {docs_read, 0},
- {docs_written, 0},
- {doc_write_failures, 0},
- {source_seq, SourceCurSeq},
- {checkpointed_source_seq, CommittedSeq},
- {progress, 0},
- {checkpoint_interval, CheckpointInterval}
- ]),
- couch_task_status:set_update_frequency(1000),
-
- % Until OTP R14B03:
- %
- % Restarting a temporary supervised child implies that the original arguments
- % (#rep{} record) specified in the MFA component of the supervisor
- % child spec will always be used whenever the child is restarted.
- % This implies the same replication performance tunning parameters will
- % always be used. The solution is to delete the child spec (see
- % cancel_replication/1) and then start the replication again, but this is
- % unfortunately not immune to race conditions.
-
- ?LOG_INFO("Replication `~p` is using:~n"
- "~c~p worker processes~n"
- "~ca worker batch size of ~p~n"
- "~c~p HTTP connections~n"
- "~ca connection timeout of ~p milliseconds~n"
- "~c~p retries per request~n"
- "~csocket options are: ~s~s",
- [BaseId ++ Ext, $\t, NumWorkers, $\t, BatchSize, $\t,
- MaxConns, $\t, get_value(connection_timeout, Options),
- $\t, get_value(retries, Options),
- $\t, io_lib:format("~p", [get_value(socket_options, Options)]),
- case StartSeq of
- ?LOWEST_SEQ ->
- "";
- _ ->
- io_lib:format("~n~csource start sequence ~p", [$\t, StartSeq])
- end]),
-
- ?LOG_DEBUG("Worker pids are: ~p", [Workers]),
-
- couch_replicator_manager:replication_started(Rep),
-
- {ok, State#rep_state{
- changes_queue = ChangesQueue,
- changes_manager = ChangesManager,
- changes_reader = ChangesReader,
- workers = Workers
- }
- }.
-
-
-handle_info(shutdown, St) ->
- {stop, shutdown, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{source_monitor = Ref} = St) ->
- ?LOG_ERROR("Source database is down. Reason: ~p", [Why]),
- {stop, source_db_down, St};
-
-handle_info({'DOWN', Ref, _, _, Why}, #rep_state{target_monitor = Ref} = St) ->
- ?LOG_ERROR("Target database is down. Reason: ~p", [Why]),
- {stop, target_db_down, St};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_reader=Pid} = State) ->
- {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_reader=Pid} = State) ->
- ?LOG_ERROR("ChangesReader process died with reason: ~p", [Reason]),
- {stop, changes_reader_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_manager = Pid} = State) ->
- {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_manager = Pid} = State) ->
- ?LOG_ERROR("ChangesManager process died with reason: ~p", [Reason]),
- {stop, changes_manager_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{changes_queue=Pid} = State) ->
- {noreply, State};
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{changes_queue=Pid} = State) ->
- ?LOG_ERROR("ChangesQueue process died with reason: ~p", [Reason]),
- {stop, changes_queue_died, cancel_timer(State)};
-
-handle_info({'EXIT', Pid, normal}, #rep_state{workers = Workers} = State) ->
- case Workers -- [Pid] of
- Workers ->
- {stop, {unknown_process_died, Pid, normal}, State};
- [] ->
- catch unlink(State#rep_state.changes_manager),
- catch exit(State#rep_state.changes_manager, kill),
- do_last_checkpoint(State);
- Workers2 ->
- {noreply, State#rep_state{workers = Workers2}}
- end;
-
-handle_info({'EXIT', Pid, Reason}, #rep_state{workers = Workers} = State) ->
- State2 = cancel_timer(State),
- case lists:member(Pid, Workers) of
- false ->
- {stop, {unknown_process_died, Pid, Reason}, State2};
- true ->
- ?LOG_ERROR("Worker ~p died with reason: ~p", [Pid, Reason]),
- {stop, {worker_died, Pid, Reason}, State2}
- end.
-
-
-handle_call(get_details, _From, #rep_state{rep_details = Rep} = State) ->
- {reply, {ok, Rep}, State};
-
-handle_call({add_stats, Stats}, From, State) ->
- gen_server:reply(From, ok),
- NewStats = couch_replicator_utils:sum_stats(State#rep_state.stats, Stats),
- {noreply, State#rep_state{stats = NewStats}};
-
-handle_call({report_seq_done, Seq, StatsInc}, From,
- #rep_state{seqs_in_progress = SeqsInProgress, highest_seq_done = HighestDone,
- current_through_seq = ThroughSeq, stats = Stats} = State) ->
- gen_server:reply(From, ok),
- {NewThroughSeq0, NewSeqsInProgress} = case SeqsInProgress of
- [Seq | Rest] ->
- {Seq, Rest};
- [_ | _] ->
- {ThroughSeq, ordsets:del_element(Seq, SeqsInProgress)}
- end,
- NewHighestDone = lists:max([HighestDone, Seq]),
- NewThroughSeq = case NewSeqsInProgress of
- [] ->
- lists:max([NewThroughSeq0, NewHighestDone]);
- _ ->
- NewThroughSeq0
- end,
- ?LOG_DEBUG("Worker reported seq ~p, through seq was ~p, "
- "new through seq is ~p, highest seq done was ~p, "
- "new highest seq done is ~p~n"
- "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
- [Seq, ThroughSeq, NewThroughSeq, HighestDone,
- NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
- SourceCurSeq = source_cur_seq(State),
- NewState = State#rep_state{
- stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
- current_through_seq = NewThroughSeq,
- seqs_in_progress = NewSeqsInProgress,
- highest_seq_done = NewHighestDone,
- source_seq = SourceCurSeq
- },
- update_task(NewState),
- {noreply, NewState}.
-
-
-handle_cast({db_compacted, DbName},
- #rep_state{source = #db{name = DbName} = Source} = State) ->
- {ok, NewSource} = couch_db:reopen(Source),
- {noreply, State#rep_state{source = NewSource}};
-
-handle_cast({db_compacted, DbName},
- #rep_state{target = #db{name = DbName} = Target} = State) ->
- {ok, NewTarget} = couch_db:reopen(Target),
- {noreply, State#rep_state{target = NewTarget}};
-
-handle_cast(checkpoint, State) ->
- case do_checkpoint(State) of
- {ok, NewState} ->
- {noreply, NewState#rep_state{timer = start_timer(State)}};
- Error ->
- {stop, Error, State}
- end;
-
-handle_cast({report_seq, Seq},
- #rep_state{seqs_in_progress = SeqsInProgress} = State) ->
- NewSeqsInProgress = ordsets:add_element(Seq, SeqsInProgress),
- {noreply, State#rep_state{seqs_in_progress = NewSeqsInProgress}}.
-
-
-code_change(OldVsn, OldState, Extra) when tuple_size(OldState) =:= 30 ->
- code_change(OldVsn, erlang:append_element(OldState, true), Extra);
-code_change(OldVsn, OldState, Extra) when tuple_size(OldState) =:= 31 ->
- code_change(OldVsn, erlang:append_element(OldState, 5000), Extra);
-code_change(_OldVsn, #rep_state{}=State, _Extra) ->
- {ok, State}.
-
-
-terminate(normal, #rep_state{rep_details = #rep{id = RepId} = Rep,
- checkpoint_history = CheckpointHistory} = State) ->
- terminate_cleanup(State),
- couch_replicator_notifier:notify({finished, RepId, CheckpointHistory}),
- couch_replicator_manager:replication_completed(Rep, rep_stats(State));
-
-terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
- % cancelled replication throught ?MODULE:cancel_replication/1
- couch_replicator_notifier:notify({error, RepId, <<"cancelled">>}),
- terminate_cleanup(State);
-
-terminate(Reason, State) ->
- #rep_state{
- source_name = Source,
- target_name = Target,
- rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
- } = State,
- ?LOG_ERROR("Replication `~s` (`~s` -> `~s`) failed: ~s",
- [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
- terminate_cleanup(State),
- couch_replicator_notifier:notify({error, RepId, Reason}),
- couch_replicator_manager:replication_error(Rep, Reason).
-
-
-terminate_cleanup(State) ->
- update_task(State),
- stop_db_compaction_notifier(State#rep_state.source_db_compaction_notifier),
- stop_db_compaction_notifier(State#rep_state.target_db_compaction_notifier),
- couch_replicator_api_wrap:db_close(State#rep_state.source),
- couch_replicator_api_wrap:db_close(State#rep_state.target).
-
-
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
- highest_seq_done = {_Ts, ?LOWEST_SEQ}} = State) ->
- {stop, normal, cancel_timer(State)};
-do_last_checkpoint(#rep_state{seqs_in_progress = [],
- highest_seq_done = Seq} = State) ->
- case do_checkpoint(State#rep_state{current_through_seq = Seq}) of
- {ok, NewState} ->
- {stop, normal, cancel_timer(NewState)};
- Error ->
- {stop, Error, State}
- end.
-
-
-start_timer(State) ->
- After = State#rep_state.checkpoint_interval,
- case timer:apply_after(After, gen_server, cast, [self(), checkpoint]) of
- {ok, Ref} ->
- Ref;
- Error ->
- ?LOG_ERROR("Replicator, error scheduling checkpoint: ~p", [Error]),
- nil
- end.
-
-
-cancel_timer(#rep_state{timer = nil} = State) ->
- State;
-cancel_timer(#rep_state{timer = Timer} = State) ->
- {ok, cancel} = timer:cancel(Timer),
- State#rep_state{timer = nil}.
-
-
-init_state(Rep) ->
- #rep{
- source = Src, target = Tgt,
- options = Options, user_ctx = UserCtx,
- type = Type, view = View
- } = Rep,
- {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
- {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
- get_value(create_target, Options, false)),
-
- {ok, SourceInfo} = couch_replicator_api_wrap:get_db_info(Source),
- {ok, TargetInfo} = couch_replicator_api_wrap:get_db_info(Target),
-
- [SourceLog, TargetLog] = find_replication_logs([Source, Target], Rep),
-
- {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
- StartSeq1 = get_value(since_seq, Options, StartSeq0),
- StartSeq = {0, StartSeq1},
-
- SourceSeq = case Type of
- db -> get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ);
- view ->
- {DDoc, VName} = View,
- {ok, VInfo} = couch_replicator_api_wrap:get_view_info(Source, DDoc,
- VName),
- get_value(<<"update_seq">>, VInfo, ?LOWEST_SEQ)
- end,
-
-
- #doc{body={CheckpointHistory}} = SourceLog,
- State = #rep_state{
- rep_details = Rep,
- source_name = couch_replicator_api_wrap:db_uri(Source),
- target_name = couch_replicator_api_wrap:db_uri(Target),
- source = Source,
- target = Target,
- history = History,
- checkpoint_history = {[{<<"no_changes">>, true}| CheckpointHistory]},
- start_seq = StartSeq,
- current_through_seq = StartSeq,
- committed_seq = StartSeq,
- source_log = SourceLog,
- target_log = TargetLog,
- rep_starttime = couch_util:rfc1123_date(),
- src_starttime = get_value(<<"instance_start_time">>, SourceInfo),
- tgt_starttime = get_value(<<"instance_start_time">>, TargetInfo),
- session_id = couch_uuids:random(),
- source_db_compaction_notifier =
- start_db_compaction_notifier(Source, self()),
- target_db_compaction_notifier =
- start_db_compaction_notifier(Target, self()),
- source_monitor = db_monitor(Source),
- target_monitor = db_monitor(Target),
- source_seq = SourceSeq,
- use_checkpoints = get_value(use_checkpoints, Options, true),
- checkpoint_interval = get_value(checkpoint_interval, Options,
- 5000),
- type = Type,
- view = View
- },
- State#rep_state{timer = start_timer(State)}.
-
-
-find_replication_logs(DbList, #rep{id = {BaseId, _}} = Rep) ->
- LogId = ?l2b(?LOCAL_DOC_PREFIX ++ BaseId),
- fold_replication_logs(DbList, ?REP_ID_VERSION, LogId, LogId, Rep, []).
-
-
-fold_replication_logs([], _Vsn, _LogId, _NewId, _Rep, Acc) ->
- lists:reverse(Acc);
-
-fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
- case couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body]) of
- {error, <<"not_found">>} when Vsn > 1 ->
- OldRepId = couch_replicator_utils:replication_id(Rep, Vsn - 1),
- fold_replication_logs(Dbs, Vsn - 1,
- ?l2b(?LOCAL_DOC_PREFIX ++ OldRepId), NewId, Rep, Acc);
- {error, <<"not_found">>} ->
- fold_replication_logs(
- Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [#doc{id = NewId} | Acc]);
- {ok, Doc} when LogId =:= NewId ->
- fold_replication_logs(
- Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [Doc | Acc]);
- {ok, Doc} ->
- MigratedLog = #doc{id = NewId, body = Doc#doc.body},
- fold_replication_logs(
- Rest, ?REP_ID_VERSION, NewId, NewId, Rep, [MigratedLog | Acc])
- end.
-
-
-spawn_changes_reader(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
- spawn_link(fun() ->
- put(last_seq, StartSeq),
- put(retries_left, Db#httpdb.retries),
- read_changes(StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options)
- end);
-spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) ->
- spawn_link(fun() ->
- read_changes(StartSeq, Db, ChangesQueue, Options)
- end).
-
-read_changes(StartSeq, Db, ChangesQueue, Options) ->
- try
- couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq,
- fun(#doc_info{high_seq = Seq, id = Id} = DocInfo) ->
- case Id of
- <<>> ->
- % Previous CouchDB releases had a bug which allowed a doc
- % with an empty ID to be inserted into databases. Such doc
- % is impossible to GET.
- ?LOG_ERROR("Replicator: ignoring document with empty ID in "
- "source database `~s` (_changes sequence ~p)",
- [couch_replicator_api_wrap:db_uri(Db), Seq]);
- _ ->
- ok = couch_work_queue:queue(ChangesQueue, DocInfo)
- end,
- put(last_seq, Seq)
- end, Options),
- couch_work_queue:close(ChangesQueue)
- catch exit:{http_request_failed, _, _, _} = Error ->
- case get(retries_left) of
- N when N > 0 ->
- put(retries_left, N - 1),
- LastSeq = get(last_seq),
- Db2 = case LastSeq of
- StartSeq ->
- ?LOG_INFO("Retrying _changes request to source database ~s"
- " with since=~p in ~p seconds",
- [couch_replicator_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
- ok = timer:sleep(Db#httpdb.wait),
- Db#httpdb{wait = 2 * Db#httpdb.wait};
- _ ->
- ?LOG_INFO("Retrying _changes request to source database ~s"
- " with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]),
- Db
- end,
- read_changes(LastSeq, Db2, ChangesQueue, Options);
- _ ->
- exit(Error)
- end
- end.
-
-
-spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
- spawn_link(fun() ->
- changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
- end).
-
-changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts) ->
- receive
- {get_changes, From} ->
- case couch_work_queue:dequeue(ChangesQueue, BatchSize) of
- closed ->
- From ! {closed, self()};
- {ok, Changes} ->
- #doc_info{high_seq = Seq} = lists:last(Changes),
- ReportSeq = {Ts, Seq},
- ok = gen_server:cast(Parent, {report_seq, ReportSeq}),
- From ! {changes, self(), Changes, ReportSeq}
- end,
- changes_manager_loop_open(Parent, ChangesQueue, BatchSize, Ts + 1)
- end.
-
-
-do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
- NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
- {ok, NewState};
-do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
- SourceCurSeq = source_cur_seq(State),
- NewState = State#rep_state{source_seq = SourceCurSeq},
- update_task(NewState),
- {ok, NewState};
-do_checkpoint(State) ->
- #rep_state{
- source_name=SourceName,
- target_name=TargetName,
- source = Source,
- target = Target,
- history = OldHistory,
- start_seq = {_, StartSeq},
- current_through_seq = {_Ts, NewSeq} = NewTsSeq,
- source_log = SourceLog,
- target_log = TargetLog,
- rep_starttime = ReplicationStartTime,
- src_starttime = SrcInstanceStartTime,
- tgt_starttime = TgtInstanceStartTime,
- stats = Stats,
- rep_details = #rep{options = Options},
- session_id = SessionId
- } = State,
- case commit_to_both(Source, Target) of
- {source_error, Reason} ->
- {checkpoint_commit_failure,
- <<"Failure on source commit: ", (to_binary(Reason))/binary>>};
- {target_error, Reason} ->
- {checkpoint_commit_failure,
- <<"Failure on target commit: ", (to_binary(Reason))/binary>>};
- {SrcInstanceStartTime, TgtInstanceStartTime} ->
- ?LOG_INFO("recording a checkpoint for `~s` -> `~s` at source update_seq ~p",
- [SourceName, TargetName, NewSeq]),
- StartTime = ?l2b(ReplicationStartTime),
- EndTime = ?l2b(couch_util:rfc1123_date()),
- NewHistoryEntry = {[
- {<<"session_id">>, SessionId},
- {<<"start_time">>, StartTime},
- {<<"end_time">>, EndTime},
- {<<"start_last_seq">>, StartSeq},
- {<<"end_last_seq">>, NewSeq},
- {<<"recorded_seq">>, NewSeq},
- {<<"missing_checked">>, Stats#rep_stats.missing_checked},
- {<<"missing_found">>, Stats#rep_stats.missing_found},
- {<<"docs_read">>, Stats#rep_stats.docs_read},
- {<<"docs_written">>, Stats#rep_stats.docs_written},
- {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
- ]},
- BaseHistory = [
- {<<"session_id">>, SessionId},
- {<<"source_last_seq">>, NewSeq},
- {<<"replication_id_version">>, ?REP_ID_VERSION}
- ] ++ case get_value(doc_ids, Options) of
- undefined ->
- [];
- _DocIds ->
- % backwards compatibility with the result of a replication by
- % doc IDs in versions 0.11.x and 1.0.x
- % TODO: deprecate (use same history format, simplify code)
- [
- {<<"start_time">>, StartTime},
- {<<"end_time">>, EndTime},
- {<<"docs_read">>, Stats#rep_stats.docs_read},
- {<<"docs_written">>, Stats#rep_stats.docs_written},
- {<<"doc_write_failures">>, Stats#rep_stats.doc_write_failures}
- ]
- end,
- % limit history to 50 entries
- NewRepHistory = {
- BaseHistory ++
- [{<<"history">>, lists:sublist([NewHistoryEntry | OldHistory], 50)}]
- },
-
- try
- {SrcRevPos, SrcRevId} = update_checkpoint(
- Source, SourceLog#doc{body = NewRepHistory}, source),
- {TgtRevPos, TgtRevId} = update_checkpoint(
- Target, TargetLog#doc{body = NewRepHistory}, target),
- SourceCurSeq = source_cur_seq(State),
- NewState = State#rep_state{
- source_seq = SourceCurSeq,
- checkpoint_history = NewRepHistory,
- committed_seq = NewTsSeq,
- source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
- target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
- },
- update_task(NewState),
- {ok, NewState}
- catch throw:{checkpoint_commit_failure, _} = Failure ->
- Failure
- end;
- {SrcInstanceStartTime, _NewTgtInstanceStartTime} ->
- {checkpoint_commit_failure, <<"Target database out of sync. "
- "Try to increase max_dbs_open at the target's server.">>};
- {_NewSrcInstanceStartTime, TgtInstanceStartTime} ->
- {checkpoint_commit_failure, <<"Source database out of sync. "
- "Try to increase max_dbs_open at the source's server.">>};
- {_NewSrcInstanceStartTime, _NewTgtInstanceStartTime} ->
- {checkpoint_commit_failure, <<"Source and target databases out of "
- "sync. Try to increase max_dbs_open at both servers.">>}
- end.
-
-
-update_checkpoint(Db, Doc, DbType) ->
- try
- update_checkpoint(Db, Doc)
- catch throw:{checkpoint_commit_failure, Reason} ->
- throw({checkpoint_commit_failure,
- <<"Error updating the ", (to_binary(DbType))/binary,
- " checkpoint document: ", (to_binary(Reason))/binary>>})
- end.
-
-update_checkpoint(Db, #doc{id = LogId, body = LogBody} = Doc) ->
- try
- case couch_replicator_api_wrap:update_doc(Db, Doc, [delay_commit]) of
- {ok, PosRevId} ->
- PosRevId;
- {error, Reason} ->
- throw({checkpoint_commit_failure, Reason})
- end
- catch throw:conflict ->
- case (catch couch_replicator_api_wrap:open_doc(Db, LogId, [ejson_body])) of
- {ok, #doc{body = LogBody, revs = {Pos, [RevId | _]}}} ->
- % This means that we were able to update successfully the
- % checkpoint doc in a previous attempt but we got a connection
- % error (timeout for e.g.) before receiving the success response.
- % Therefore the request was retried and we got a conflict, as the
- % revision we sent is not the current one.
- % We confirm this by verifying the doc body we just got is the same
- % that we have just sent.
- {Pos, RevId};
- _ ->
- throw({checkpoint_commit_failure, conflict})
- end
- end.
-
-
-commit_to_both(Source, Target) ->
- % commit the src async
- ParentPid = self(),
- SrcCommitPid = spawn_link(
- fun() ->
- Result = (catch couch_replicator_api_wrap:ensure_full_commit(Source)),
- ParentPid ! {self(), Result}
- end),
-
- % commit tgt sync
- TargetResult = (catch couch_replicator_api_wrap:ensure_full_commit(Target)),
-
- SourceResult = receive
- {SrcCommitPid, Result} ->
- unlink(SrcCommitPid),
- receive {'EXIT', SrcCommitPid, _} -> ok after 0 -> ok end,
- Result;
- {'EXIT', SrcCommitPid, Reason} ->
- {error, Reason}
- end,
- case TargetResult of
- {ok, TargetStartTime} ->
- case SourceResult of
- {ok, SourceStartTime} ->
- {SourceStartTime, TargetStartTime};
- SourceError ->
- {source_error, SourceError}
- end;
- TargetError ->
- {target_error, TargetError}
- end.
-
-
-compare_replication_logs(SrcDoc, TgtDoc) ->
- #doc{body={RepRecProps}} = SrcDoc,
- #doc{body={RepRecPropsTgt}} = TgtDoc,
- case get_value(<<"session_id">>, RepRecProps) ==
- get_value(<<"session_id">>, RepRecPropsTgt) of
- true ->
- % if the records have the same session id,
- % then we have a valid replication history
- OldSeqNum = get_value(<<"source_last_seq">>, RepRecProps, ?LOWEST_SEQ),
- OldHistory = get_value(<<"history">>, RepRecProps, []),
- {OldSeqNum, OldHistory};
- false ->
- SourceHistory = get_value(<<"history">>, RepRecProps, []),
- TargetHistory = get_value(<<"history">>, RepRecPropsTgt, []),
- ?LOG_INFO("Replication records differ. "
- "Scanning histories to find a common ancestor.", []),
- ?LOG_DEBUG("Record on source:~p~nRecord on target:~p~n",
- [RepRecProps, RepRecPropsTgt]),
- compare_rep_history(SourceHistory, TargetHistory)
- end.
-
-compare_rep_history(S, T) when S =:= [] orelse T =:= [] ->
- ?LOG_INFO("no common ancestry -- performing full replication", []),
- {?LOWEST_SEQ, []};
-compare_rep_history([{S} | SourceRest], [{T} | TargetRest] = Target) ->
- SourceId = get_value(<<"session_id">>, S),
- case has_session_id(SourceId, Target) of
- true ->
- RecordSeqNum = get_value(<<"recorded_seq">>, S, ?LOWEST_SEQ),
- ?LOG_INFO("found a common replication record with source_seq ~p",
- [RecordSeqNum]),
- {RecordSeqNum, SourceRest};
- false ->
- TargetId = get_value(<<"session_id">>, T),
- case has_session_id(TargetId, SourceRest) of
- true ->
- RecordSeqNum = get_value(<<"recorded_seq">>, T, ?LOWEST_SEQ),
- ?LOG_INFO("found a common replication record with source_seq ~p",
- [RecordSeqNum]),
- {RecordSeqNum, TargetRest};
- false ->
- compare_rep_history(SourceRest, TargetRest)
- end
- end.
-
-
-has_session_id(_SessionId, []) ->
- false;
-has_session_id(SessionId, [{Props} | Rest]) ->
- case get_value(<<"session_id">>, Props, nil) of
- SessionId ->
- true;
- _Else ->
- has_session_id(SessionId, Rest)
- end.
-
-
-db_monitor(#db{} = Db) ->
- couch_db:monitor(Db);
-db_monitor(_HttpDb) ->
- nil.
-
-source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq,
- type = view, view = {DDoc, VName}}) ->
- case (catch couch_replicator_api_wrap:get_view_info(
- Db#httpdb{retries = 3}, DDoc, VName)) of
- {ok, Info} ->
- get_value(<<"update_seq">>, Info, Seq);
- _ ->
- Seq
- end;
-
-source_cur_seq(#rep_state{source = Db, source_seq = Seq,
- type = view, view = {DDoc, VName}}) ->
- {ok, Info} = couch_replicator_api_wrap:get_view_info(Db, DDoc, VName),
- get_value(<<"update_seq">>, Info, Seq);
-
-source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq}) ->
- case (catch couch_replicator_api_wrap:get_db_info(Db#httpdb{retries = 3})) of
- {ok, Info} ->
- get_value(<<"update_seq">>, Info, Seq);
- _ ->
- Seq
- end;
-source_cur_seq(#rep_state{source = Db, source_seq = Seq}) ->
- {ok, Info} = couch_replicator_api_wrap:get_db_info(Db),
- get_value(<<"update_seq">>, Info, Seq).
-
-
-update_task(State) ->
- #rep_state{
- current_through_seq = {_, CurSeq},
- source_seq = SourceCurSeq
- } = State,
- couch_task_status:update(
- rep_stats(State) ++ [
- {source_seq, SourceCurSeq},
- case is_number(CurSeq) andalso is_number(SourceCurSeq) of
- true ->
- case SourceCurSeq of
- 0 ->
- {progress, 0};
- _ ->
- {progress, (CurSeq * 100) div SourceCurSeq}
- end;
- false ->
- {progress, null}
- end
- ]).
-
-
-rep_stats(State) ->
- #rep_state{
- committed_seq = {_, CommittedSeq},
- stats = Stats
- } = State,
- [
- {revisions_checked, Stats#rep_stats.missing_checked},
- {missing_revisions_found, Stats#rep_stats.missing_found},
- {docs_read, Stats#rep_stats.docs_read},
- {docs_written, Stats#rep_stats.docs_written},
- {doc_write_failures, Stats#rep_stats.doc_write_failures},
- {checkpointed_source_seq, CommittedSeq}
- ].
-
http://git-wip-us.apache.org/repos/asf/couchdb/blob/4c39d7a5/apps/couch_replicator/src/couch_replicator.hrl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator.hrl b/apps/couch_replicator/src/couch_replicator.hrl
deleted file mode 100644
index 1eee88e..0000000
--- a/apps/couch_replicator/src/couch_replicator.hrl
+++ /dev/null
@@ -1,32 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--define(REP_ID_VERSION, 3).
-
--record(rep, {
- id,
- source,
- target,
- options,
- user_ctx,
- type = db,
- view = nil,
- doc_id
-}).
-
--record(rep_stats, {
- missing_checked = 0,
- missing_found = 0,
- docs_read = 0,
- docs_written = 0,
- doc_write_failures = 0
-}).
http://git-wip-us.apache.org/repos/asf/couchdb/blob/4c39d7a5/apps/couch_replicator/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator_api_wrap.erl b/apps/couch_replicator/src/couch_replicator_api_wrap.erl
deleted file mode 100644
index 1e0e660..0000000
--- a/apps/couch_replicator/src/couch_replicator_api_wrap.erl
+++ /dev/null
@@ -1,900 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_api_wrap).
-
-% This module wraps the native erlang API, and allows for performing
-% operations on a remote vs. local databases via the same API.
-%
-% Notes:
-% Many options and apis aren't yet supported here, they are added as needed.
-
--include_lib("couch/include/couch_db.hrl").
--include_lib("couch_mrview/include/couch_mrview.hrl").
--include("couch_replicator_api_wrap.hrl").
-
--export([
- db_open/2,
- db_open/3,
- db_close/1,
- get_db_info/1,
- get_view_info/3,
- update_doc/3,
- update_doc/4,
- update_docs/3,
- update_docs/4,
- ensure_full_commit/1,
- get_missing_revs/2,
- open_doc/3,
- open_doc_revs/6,
- changes_since/5,
- db_uri/1
- ]).
-
--import(couch_replicator_httpc, [
- send_req/3
- ]).
-
--import(couch_util, [
- encode_doc_id/1,
- get_value/2,
- get_value/3
- ]).
-
--define(MAX_WAIT, 5 * 60 * 1000).
-
-db_uri(#httpdb{url = Url}) ->
- couch_util:url_strip_password(Url);
-
-db_uri(#db{name = Name}) ->
- db_uri(Name);
-
-db_uri(DbName) ->
- ?b2l(DbName).
-
-
-db_open(Db, Options) ->
- db_open(Db, Options, false).
-
-db_open(#httpdb{} = Db1, _Options, Create) ->
- {ok, Db} = couch_replicator_httpc:setup(Db1),
- case Create of
- false ->
- ok;
- true ->
- send_req(Db, [{method, put}], fun(_, _, _) -> ok end)
- end,
- send_req(Db, [{method, head}],
- fun(200, _, _) ->
- {ok, Db};
- (401, _, _) ->
- throw({unauthorized, ?l2b(db_uri(Db))});
- (_, _, _) ->
- throw({db_not_found, ?l2b(db_uri(Db))})
- end);
-db_open(DbName, Options, Create) ->
- try
- case Create of
- false ->
- ok;
- true ->
- ok = couch_httpd:verify_is_server_admin(
- get_value(user_ctx, Options)),
- couch_db:create(DbName, Options)
- end,
- case couch_db:open(DbName, Options) of
- {error, illegal_database_name, _} ->
- throw({db_not_found, DbName});
- {not_found, _Reason} ->
- throw({db_not_found, DbName});
- {ok, _Db} = Success ->
- Success
- end
- catch
- throw:{unauthorized, _} ->
- throw({unauthorized, DbName})
- end.
-
-db_close(#httpdb{httpc_pool = Pool}) ->
- unlink(Pool),
- ok = couch_replicator_httpc_pool:stop(Pool);
-db_close(DbName) ->
- catch couch_db:close(DbName).
-
-
-get_db_info(#httpdb{} = Db) ->
- send_req(Db, [],
- fun(200, _, {Props}) ->
- {ok, Props}
- end);
-get_db_info(#db{name = DbName, user_ctx = UserCtx}) ->
- {ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
- {ok, Info} = couch_db:get_db_info(Db),
- couch_db:close(Db),
- {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
-
-
-get_view_info(#httpdb{} = Db, DDocId, ViewName) ->
- Path = iolist_to_binary([DDocId, "/_view/", ViewName, "/_info"]),
- send_req(Db, [{path, Path}],
- fun(200, _, {Props}) ->
- {ok, Props}
- end);
-get_view_info(#db{name = DbName}, DDocId, ViewName) ->
- couch_mrview:get_view_info(DbName, DDocId, ViewName).
-
-
-ensure_full_commit(#httpdb{} = Db) ->
- send_req(
- Db,
- [{method, post}, {path, "_ensure_full_commit"},
- {headers, [{"Content-Type", "application/json"}]}],
- fun(201, _, {Props}) ->
- {ok, get_value(<<"instance_start_time">>, Props)};
- (_, _, {Props}) ->
- {error, get_value(<<"error">>, Props)}
- end);
-ensure_full_commit(Db) ->
- couch_db:ensure_full_commit(Db).
-
-
-get_missing_revs(#httpdb{} = Db, IdRevs) ->
- JsonBody = {[{Id, couch_doc:revs_to_strs(Revs)} || {Id, Revs} <- IdRevs]},
- send_req(
- Db,
- [{method, post}, {path, "_revs_diff"}, {body, ?JSON_ENCODE(JsonBody)},
- {headers, [{"Content-Type", "application/json"}]}],
- fun(200, _, {Props}) ->
- ConvertToNativeFun = fun({Id, {Result}}) ->
- MissingRevs = couch_doc:parse_revs(
- get_value(<<"missing">>, Result)
- ),
- PossibleAncestors = couch_doc:parse_revs(
- get_value(<<"possible_ancestors">>, Result, [])
- ),
- {Id, MissingRevs, PossibleAncestors}
- end,
- {ok, lists:map(ConvertToNativeFun, Props)}
- end);
-get_missing_revs(Db, IdRevs) ->
- couch_db:get_missing_revs(Db, IdRevs).
-
-
-
-open_doc_revs(#httpdb{retries = 0} = HttpDb, Id, Revs, Options, _Fun, _Acc) ->
- Path = encode_doc_id(Id),
- QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]),
- Url = couch_util:url_strip_password(
- couch_replicator_httpc:full_url(HttpDb, [{path,Path}, {qs,QS}])
- ),
- ?LOG_ERROR("Replication crashing because GET ~s failed", [Url]),
- exit(kaboom);
-open_doc_revs(#httpdb{} = HttpDb, Id, Revs, Options, Fun, Acc) ->
- Path = encode_doc_id(Id),
- QS = options_to_query_args(HttpDb, Path, [revs, {open_revs, Revs} | Options]),
- {Pid, Ref} = spawn_monitor(fun() ->
- Self = self(),
- Callback = fun(200, Headers, StreamDataFun) ->
- remote_open_doc_revs_streamer_start(Self),
- {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
- get_value("Content-Type", Headers),
- StreamDataFun,
- fun mp_parse_mixed/1
- )
- end,
- Streamer = spawn_link(fun() ->
- Params = [
- {path, Path},
- {qs, QS},
- {ibrowse_options, [{stream_to, {self(), once}}]},
- {headers, [{"Accept", "multipart/mixed"}]}
- ],
- % We're setting retries to 0 here to avoid the case where the
- % Streamer retries the request and ends up jumbling together two
- % different response bodies. Retries are handled explicitly by
- % open_doc_revs itself.
- send_req(HttpDb#httpdb{retries = 0}, Params, Callback)
- end),
- % If this process dies normally we can leave
- % the Streamer process hanging around keeping an
- % HTTP connection open. This is a bit of a
- % hammer approach to making sure it releases
- % that connection back to the pool.
- spawn(fun() ->
- Ref = erlang:monitor(process, Self),
- receive
- {'DOWN', Ref, process, Self, normal} ->
- exit(Streamer, {streamer_parent_died, Self});
- {'DOWN', Ref, process, Self, _} ->
- ok
- end
- end),
- receive
- {started_open_doc_revs, Ref} ->
- Ret = receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc),
- exit({exit_ok, Ret})
- end
- end),
- receive
- {'DOWN', Ref, process, Pid, {exit_ok, Ret}} ->
- Ret;
- {'DOWN', Ref, process, Pid, {{nocatch, {missing_stub,_} = Stub}, _}} ->
- throw(Stub);
- {'DOWN', Ref, process, Pid, Else} ->
- Url = couch_util:url_strip_password(
- couch_replicator_httpc:full_url(HttpDb, [{path,Path}, {qs,QS}])
- ),
- #httpdb{retries = Retries, wait = Wait0} = HttpDb,
- Wait = 2 * erlang:min(Wait0 * 2, ?MAX_WAIT),
- ?LOG_INFO("Retrying GET to ~s in ~p seconds due to error ~p",
- [Url, Wait / 1000, error_reason(Else)]
- ),
- ok = timer:sleep(Wait),
- RetryDb = HttpDb#httpdb{
- retries = Retries - 1,
- wait = Wait
- },
- open_doc_revs(RetryDb, Id, Revs, Options, Fun, Acc)
- end;
-open_doc_revs(Db, Id, Revs, Options, Fun, Acc) ->
- {ok, Results} = couch_db:open_doc_revs(Db, Id, Revs, Options),
- {ok, lists:foldl(fun(R, A) -> {_, A2} = Fun(R, A), A2 end, Acc, Results)}.
-
-error_reason({http_request_failed, "GET", _Url, {error, timeout}}) ->
- timeout;
-error_reason({http_request_failed, "GET", _Url, {error, {_, req_timedout}}}) ->
- req_timedout;
-error_reason({http_request_failed, "GET", _Url, Error}) ->
- Error;
-error_reason(Else) ->
- Else.
-
-open_doc(#httpdb{} = Db, Id, Options) ->
- send_req(
- Db,
- [{path, encode_doc_id(Id)}, {qs, options_to_query_args(Options, [])}],
- fun(200, _, Body) ->
- {ok, couch_doc:from_json_obj(Body)};
- (_, _, {Props}) ->
- {error, get_value(<<"error">>, Props)}
- end);
-open_doc(Db, Id, Options) ->
- case couch_db:open_doc(Db, Id, Options) of
- {ok, _} = Ok ->
- Ok;
- {not_found, _Reason} ->
- {error, <<"not_found">>}
- end.
-
-
-update_doc(Db, Doc, Options) ->
- update_doc(Db, Doc, Options, interactive_edit).
-
-update_doc(#httpdb{} = HttpDb, #doc{id = DocId} = Doc, Options, Type) ->
- QArgs = case Type of
- replicated_changes ->
- [{"new_edits", "false"}];
- _ ->
- []
- end ++ options_to_query_args(Options, []),
- Boundary = couch_uuids:random(),
- JsonBytes = ?JSON_ENCODE(
- couch_doc:to_json_obj(
- Doc, [revs, attachments, follows, att_encoding_info | Options])),
- {ContentType, Len} = couch_doc:len_doc_to_multi_part_stream(Boundary,
- JsonBytes, Doc#doc.atts, true),
- Headers = case lists:member(delay_commit, Options) of
- true ->
- [{"X-Couch-Full-Commit", "false"}];
- false ->
- []
- end ++ [{"Content-Type", ?b2l(ContentType)}, {"Content-Length", Len}],
- Body = {fun stream_doc/1, {JsonBytes, Doc#doc.atts, Boundary, Len}},
- send_req(
- % A crash here bubbles all the way back up to run_user_fun inside
- % open_doc_revs, which will retry the whole thing. That's the
- % appropriate course of action, since we've already started streaming
- % the response body from the GET request.
- HttpDb#httpdb{retries = 0},
- [{method, put}, {path, encode_doc_id(DocId)},
- {qs, QArgs}, {headers, Headers}, {body, Body}],
- fun(Code, _, {Props}) when Code =:= 200 orelse Code =:= 201 ->
- {ok, couch_doc:parse_rev(get_value(<<"rev">>, Props))};
- (409, _, _) ->
- throw(conflict);
- (Code, _, {Props}) ->
- case {Code, get_value(<<"error">>, Props)} of
- {401, <<"unauthorized">>} ->
- throw({unauthorized, get_value(<<"reason">>, Props)});
- {403, <<"forbidden">>} ->
- throw({forbidden, get_value(<<"reason">>, Props)});
- {412, <<"missing_stub">>} ->
- throw({missing_stub, get_value(<<"reason">>, Props)});
- {_, Error} ->
- {error, Error}
- end
- end);
-update_doc(Db, Doc, Options, Type) ->
- couch_db:update_doc(Db, Doc, Options, Type).
-
-
-update_docs(Db, DocList, Options) ->
- update_docs(Db, DocList, Options, interactive_edit).
-
-update_docs(_Db, [], _Options, _UpdateType) ->
- {ok, []};
-update_docs(#httpdb{} = HttpDb, DocList, Options, UpdateType) ->
- FullCommit = atom_to_list(not lists:member(delay_commit, Options)),
- Prefix = case UpdateType of
- replicated_changes ->
- <<"{\"new_edits\":false,\"docs\":[">>;
- interactive_edit ->
- <<"{\"docs\":[">>
- end,
- Suffix = <<"]}">>,
- % Note: nginx and other servers don't like PUT/POST requests without
- % a Content-Length header, so we can't do a chunked transfer encoding
- % and JSON encode each doc only before sending it through the socket.
- {Docs, Len} = lists:mapfoldl(
- fun(#doc{} = Doc, Acc) ->
- Json = ?JSON_ENCODE(couch_doc:to_json_obj(Doc, [revs, attachments])),
- {Json, Acc + iolist_size(Json)};
- (Doc, Acc) ->
- {Doc, Acc + iolist_size(Doc)}
- end,
- byte_size(Prefix) + byte_size(Suffix) + length(DocList) - 1,
- DocList),
- BodyFun = fun(eof) ->
- eof;
- ([]) ->
- {ok, Suffix, eof};
- ([prefix | Rest]) ->
- {ok, Prefix, Rest};
- ([Doc]) ->
- {ok, Doc, []};
- ([Doc | RestDocs]) ->
- {ok, [Doc, ","], RestDocs}
- end,
- Headers = [
- {"Content-Length", Len},
- {"Content-Type", "application/json"},
- {"X-Couch-Full-Commit", FullCommit}
- ],
- send_req(
- HttpDb,
- [{method, post}, {path, "_bulk_docs"},
- {body, {BodyFun, [prefix | Docs]}}, {headers, Headers}],
- fun(201, _, Results) when is_list(Results) ->
- {ok, bulk_results_to_errors(DocList, Results, remote)};
- (417, _, Results) when is_list(Results) ->
- {ok, bulk_results_to_errors(DocList, Results, remote)}
- end);
-update_docs(Db, DocList, Options, UpdateType) ->
- Result = couch_db:update_docs(Db, DocList, Options, UpdateType),
- {ok, bulk_results_to_errors(DocList, Result, UpdateType)}.
-
-
-changes_since(#httpdb{headers = Headers1} = HttpDb, Style, StartSeq,
- UserFun, Options) ->
- HeartBeat = erlang:max(1000, HttpDb#httpdb.timeout div 3),
- BaseQArgs = case get_value(continuous, Options, false) of
- false ->
- [{"feed", "normal"}];
- true ->
- [{"feed", "continuous"}]
- end ++ [
- {"style", atom_to_list(Style)}, {"since", ?JSON_ENCODE(StartSeq)},
- {"heartbeat", integer_to_list(HeartBeat)}
- ],
- DocIds = get_value(doc_ids, Options),
- {QArgs, Method, Body, Headers} = case DocIds of
- undefined ->
- QArgs1 = maybe_add_changes_filter_q_args(BaseQArgs, Options),
- {QArgs1, get, [], Headers1};
- _ when is_list(DocIds) ->
- Headers2 = [{"Content-Type", "application/json"} | Headers1],
- JsonDocIds = ?JSON_ENCODE({[{<<"doc_ids">>, DocIds}]}),
- {[{"filter", "_doc_ids"} | BaseQArgs], post, JsonDocIds, Headers2}
- end,
- send_req(
- HttpDb,
- [{method, Method}, {path, "_changes"}, {qs, QArgs},
- {headers, Headers}, {body, Body},
- {ibrowse_options, [{stream_to, {self(), once}}]}],
- fun(200, _, DataStreamFun) ->
- parse_changes_feed(Options, UserFun, DataStreamFun);
- (405, _, _) when is_list(DocIds) ->
- % CouchDB versions < 1.1.0 don't have the builtin _changes feed
- % filter "_doc_ids" neither support POST
- send_req(HttpDb, [{method, get}, {path, "_changes"},
- {qs, BaseQArgs}, {headers, Headers1},
- {ibrowse_options, [{stream_to, {self(), once}}]}],
- fun(200, _, DataStreamFun2) ->
- UserFun2 = fun(#doc_info{id = Id} = DocInfo) ->
- case lists:member(Id, DocIds) of
- true ->
- UserFun(DocInfo);
- false ->
- ok
- end
- end,
- parse_changes_feed(Options, UserFun2, DataStreamFun2)
- end)
- end);
-changes_since(Db, Style, StartSeq, UserFun, Options) ->
- Filter = case get_value(doc_ids, Options) of
- undefined ->
- ?b2l(get_value(filter, Options, <<>>));
- _DocIds ->
- "_doc_ids"
- end,
- Args = #changes_args{
- style = Style,
- since = StartSeq,
- filter = Filter,
- feed = case get_value(continuous, Options, false) of
- true ->
- "continuous";
- false ->
- "normal"
- end,
- timeout = infinity
- },
- QueryParams = get_value(query_params, Options, {[]}),
- Req = changes_json_req(Db, Filter, QueryParams, Options),
- ChangesFeedFun = couch_httpd_changes:handle_changes(Args, {json_req, Req},
- Db),
- ChangesFeedFun(fun({change, Change, _}, _) ->
- UserFun(json_to_doc_info(Change));
- (_, _) ->
- ok
- end).
-
-
-% internal functions
-
-maybe_add_changes_filter_q_args(BaseQS, Options) ->
- case get_value(filter, Options) of
- undefined ->
- BaseQS;
- FilterName ->
- %% get list of view attributes
- ViewFields0 = [atom_to_list(F) || F <- record_info(fields, mrargs)],
- ViewFields = ["key" | ViewFields0],
-
- {Params} = get_value(query_params, Options, {[]}),
- [{"filter", ?b2l(FilterName)} | lists:foldl(
- fun({K, V}, QSAcc) ->
- Ks = couch_util:to_list(K),
- case lists:keymember(Ks, 1, QSAcc) of
- true ->
- QSAcc;
- false when FilterName =:= <<"_view">> ->
- V1 = case lists:member(Ks, ViewFields) of
- true -> ?JSON_ENCODE(V);
- false -> couch_util:to_list(V)
- end,
- [{Ks, V1} | QSAcc];
- false ->
- [{Ks, couch_util:to_list(V)} | QSAcc]
- end
- end,
- BaseQS, Params)]
- end.
-
-parse_changes_feed(Options, UserFun, DataStreamFun) ->
- case get_value(continuous, Options, false) of
- true ->
- continuous_changes(DataStreamFun, UserFun);
- false ->
- EventFun = fun(Ev) ->
- changes_ev1(Ev, fun(DocInfo, _) -> UserFun(DocInfo) end, [])
- end,
- json_stream_parse:events(DataStreamFun, EventFun)
- end.
-
-changes_json_req(_Db, "", _QueryParams, _Options) ->
- {[]};
-changes_json_req(_Db, "_doc_ids", _QueryParams, Options) ->
- {[{<<"doc_ids">>, get_value(doc_ids, Options)}]};
-changes_json_req(Db, FilterName, {QueryParams}, _Options) ->
- {ok, Info} = couch_db:get_db_info(Db),
- % simulate a request to db_name/_changes
- {[
- {<<"info">>, {Info}},
- {<<"id">>, null},
- {<<"method">>, 'GET'},
- {<<"path">>, [couch_db:name(Db), <<"_changes">>]},
- {<<"query">>, {[{<<"filter">>, FilterName} | QueryParams]}},
- {<<"headers">>, []},
- {<<"body">>, []},
- {<<"peer">>, <<"replicator">>},
- {<<"form">>, []},
- {<<"cookie">>, []},
- {<<"userCtx">>, couch_util:json_user_ctx(Db)}
- ]}.
-
-
-options_to_query_args(HttpDb, Path, Options) ->
- case lists:keytake(atts_since, 1, Options) of
- false ->
- options_to_query_args(Options, []);
- {value, {atts_since, []}, Options2} ->
- options_to_query_args(Options2, []);
- {value, {atts_since, PAs}, Options2} ->
- QueryArgs1 = options_to_query_args(Options2, []),
- FullUrl = couch_replicator_httpc:full_url(
- HttpDb, [{path, Path}, {qs, QueryArgs1}]),
- RevList = atts_since_arg(
- length("GET " ++ FullUrl ++ " HTTP/1.1\r\n") +
- length("&atts_since=") + 6, % +6 = % encoded [ and ]
- PAs, []),
- [{"atts_since", ?JSON_ENCODE(RevList)} | QueryArgs1]
- end.
-
-
-options_to_query_args([], Acc) ->
- lists:reverse(Acc);
-options_to_query_args([ejson_body | Rest], Acc) ->
- options_to_query_args(Rest, Acc);
-options_to_query_args([delay_commit | Rest], Acc) ->
- options_to_query_args(Rest, Acc);
-options_to_query_args([revs | Rest], Acc) ->
- options_to_query_args(Rest, [{"revs", "true"} | Acc]);
-options_to_query_args([{open_revs, all} | Rest], Acc) ->
- options_to_query_args(Rest, [{"open_revs", "all"} | Acc]);
-options_to_query_args([latest | Rest], Acc) ->
- options_to_query_args(Rest, [{"latest", "true"} | Acc]);
-options_to_query_args([{open_revs, Revs} | Rest], Acc) ->
- JsonRevs = ?b2l(?JSON_ENCODE(couch_doc:revs_to_strs(Revs))),
- options_to_query_args(Rest, [{"open_revs", JsonRevs} | Acc]).
-
-
--define(MAX_URL_LEN, 7000).
-
-atts_since_arg(_UrlLen, [], Acc) ->
- lists:reverse(Acc);
-atts_since_arg(UrlLen, [PA | Rest], Acc) ->
- RevStr = couch_doc:rev_to_str(PA),
- NewUrlLen = case Rest of
- [] ->
- % plus 2 double quotes (% encoded)
- UrlLen + size(RevStr) + 6;
- _ ->
- % plus 2 double quotes and a comma (% encoded)
- UrlLen + size(RevStr) + 9
- end,
- case NewUrlLen >= ?MAX_URL_LEN of
- true ->
- lists:reverse(Acc);
- false ->
- atts_since_arg(NewUrlLen, Rest, [RevStr | Acc])
- end.
-
-
-% TODO: A less verbose, more elegant and automatic restart strategy for
-% the exported open_doc_revs/6 function. The restart should be
-% transparent to the caller like any other Couch API function exported
-% by this module.
-receive_docs_loop(Streamer, Fun, Id, Revs, Ref, Acc) ->
- try
- % Left only for debugging purposes via an interactive or remote shell
- erlang:put(open_doc_revs, {Id, Revs, Ref, Streamer}),
- receive_docs(Streamer, Fun, Ref, Acc)
- catch
- error:{restart_open_doc_revs, NewRef} ->
- receive_docs_loop(Streamer, Fun, Id, Revs, NewRef, Acc)
- end.
-
-receive_docs(Streamer, UserFun, Ref, UserAcc) ->
- Streamer ! {get_headers, Ref, self()},
- receive
- {started_open_doc_revs, NewRef} ->
- restart_remote_open_doc_revs(Ref, NewRef);
- {headers, Ref, Headers} ->
- case get_value("content-type", Headers) of
- {"multipart/related", _} = ContentType ->
- case doc_from_multi_part_stream(
- ContentType,
- fun() -> receive_doc_data(Streamer, Ref) end,
- Ref) of
- {ok, Doc, Parser} ->
- case run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref) of
- {ok, UserAcc2} ->
- ok;
- {skip, UserAcc2} ->
- couch_doc:abort_multi_part_stream(Parser)
- end,
- receive_docs(Streamer, UserFun, Ref, UserAcc2)
- end;
- {"application/json", []} ->
- Doc = couch_doc:from_json_obj(
- ?JSON_DECODE(receive_all(Streamer, Ref, []))),
- {_, UserAcc2} = run_user_fun(UserFun, {ok, Doc}, UserAcc, Ref),
- receive_docs(Streamer, UserFun, Ref, UserAcc2);
- {"application/json", [{"error","true"}]} ->
- {ErrorProps} = ?JSON_DECODE(receive_all(Streamer, Ref, [])),
- Rev = get_value(<<"missing">>, ErrorProps),
- Result = {{not_found, missing}, couch_doc:parse_rev(Rev)},
- {_, UserAcc2} = run_user_fun(UserFun, Result, UserAcc, Ref),
- receive_docs(Streamer, UserFun, Ref, UserAcc2)
- end;
- {done, Ref} ->
- {ok, UserAcc}
- end.
-
-
-run_user_fun(UserFun, Arg, UserAcc, OldRef) ->
- {Pid, Ref} = spawn_monitor(fun() ->
- try UserFun(Arg, UserAcc) of
- Resp ->
- exit({exit_ok, Resp})
- catch
- throw:Reason ->
- exit({exit_throw, Reason});
- error:Reason ->
- exit({exit_error, Reason});
- exit:Reason ->
- exit({exit_exit, Reason})
- end
- end),
- receive
- {started_open_doc_revs, NewRef} ->
- erlang:demonitor(Ref, [flush]),
- exit(Pid, kill),
- restart_remote_open_doc_revs(OldRef, NewRef);
- {'DOWN', Ref, process, Pid, {exit_ok, Ret}} ->
- Ret;
- {'DOWN', Ref, process, Pid, {exit_throw, Reason}} ->
- throw(Reason);
- {'DOWN', Ref, process, Pid, {exit_error, Reason}} ->
- erlang:error(Reason);
- {'DOWN', Ref, process, Pid, {exit_exit, Reason}} ->
- erlang:exit(Reason)
- end.
-
-
-restart_remote_open_doc_revs(Ref, NewRef) ->
- receive
- {body_bytes, Ref, _} ->
- restart_remote_open_doc_revs(Ref, NewRef);
- {body_done, Ref} ->
- restart_remote_open_doc_revs(Ref, NewRef);
- {done, Ref} ->
- restart_remote_open_doc_revs(Ref, NewRef);
- {headers, Ref, _} ->
- restart_remote_open_doc_revs(Ref, NewRef)
- after 0 ->
- erlang:error({restart_open_doc_revs, NewRef})
- end.
-
-
-remote_open_doc_revs_streamer_start(Parent) ->
- receive
- {get_headers, _Ref, Parent} ->
- remote_open_doc_revs_streamer_start(Parent);
- {next_bytes, _Ref, Parent} ->
- remote_open_doc_revs_streamer_start(Parent)
- after 0 ->
- Parent ! {started_open_doc_revs, make_ref()}
- end.
-
-
-receive_all(Streamer, Ref, Acc) ->
- Streamer ! {next_bytes, Ref, self()},
- receive
- {started_open_doc_revs, NewRef} ->
- restart_remote_open_doc_revs(Ref, NewRef);
- {body_bytes, Ref, Bytes} ->
- receive_all(Streamer, Ref, [Bytes | Acc]);
- {body_done, Ref} ->
- lists:reverse(Acc)
- end.
-
-
-mp_parse_mixed(eof) ->
- receive {get_headers, Ref, From} ->
- From ! {done, Ref}
- end;
-mp_parse_mixed({headers, H}) ->
- receive {get_headers, Ref, From} ->
- From ! {headers, Ref, H}
- end,
- fun mp_parse_mixed/1;
-mp_parse_mixed({body, Bytes}) ->
- receive {next_bytes, Ref, From} ->
- From ! {body_bytes, Ref, Bytes}
- end,
- fun mp_parse_mixed/1;
-mp_parse_mixed(body_end) ->
- receive {next_bytes, Ref, From} ->
- From ! {body_done, Ref};
- {get_headers, Ref, From} ->
- self() ! {get_headers, Ref, From}
- end,
- fun mp_parse_mixed/1.
-
-
-receive_doc_data(Streamer, Ref) ->
- Streamer ! {next_bytes, Ref, self()},
- receive
- {body_bytes, Ref, Bytes} ->
- {Bytes, fun() -> receive_doc_data(Streamer, Ref) end};
- {body_done, Ref} ->
- {<<>>, fun() -> receive_doc_data(Streamer, Ref) end}
- end.
-
-doc_from_multi_part_stream(ContentType, DataFun, Ref) ->
- Self = self(),
- Parser = spawn_link(fun() ->
- {<<"--">>, _, _} = couch_httpd:parse_multipart_request(
- ContentType, DataFun,
- fun(Next) -> couch_doc:mp_parse_doc(Next, []) end),
- unlink(Self)
- end),
- Parser ! {get_doc_bytes, Ref, self()},
- receive
- {started_open_doc_revs, NewRef} ->
- unlink(Parser),
- exit(Parser, kill),
- restart_remote_open_doc_revs(Ref, NewRef);
- {doc_bytes, Ref, DocBytes} ->
- Doc = couch_doc:from_json_obj(?JSON_DECODE(DocBytes)),
- ReadAttachmentDataFun = fun() ->
- link(Parser),
- Parser ! {get_bytes, Ref, self()},
- receive
- {started_open_doc_revs, NewRef} ->
- unlink(Parser),
- exit(Parser, kill),
- receive {bytes, Ref, _} -> ok after 0 -> ok end,
- restart_remote_open_doc_revs(Ref, NewRef);
- {bytes, Ref, Bytes} ->
- Bytes
- end
- end,
- Atts2 = lists:map(
- fun(#att{data = follows} = A) ->
- A#att{data = ReadAttachmentDataFun};
- (A) ->
- A
- end, Doc#doc.atts),
- {ok, Doc#doc{atts = Atts2}, Parser}
- end.
-
-
-changes_ev1(object_start, UserFun, UserAcc) ->
- fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
-
-changes_ev2({key, <<"results">>}, UserFun, UserAcc) ->
- fun(Ev) -> changes_ev3(Ev, UserFun, UserAcc) end;
-changes_ev2(_, UserFun, UserAcc) ->
- fun(Ev) -> changes_ev2(Ev, UserFun, UserAcc) end.
-
-changes_ev3(array_start, UserFun, UserAcc) ->
- fun(Ev) -> changes_ev_loop(Ev, UserFun, UserAcc) end.
-
-changes_ev_loop(object_start, UserFun, UserAcc) ->
- fun(Ev) ->
- json_stream_parse:collect_object(Ev,
- fun(Obj) ->
- UserAcc2 = UserFun(json_to_doc_info(Obj), UserAcc),
- fun(Ev2) -> changes_ev_loop(Ev2, UserFun, UserAcc2) end
- end)
- end;
-changes_ev_loop(array_end, _UserFun, _UserAcc) ->
- fun(_Ev) -> changes_ev_done() end.
-
-changes_ev_done() ->
- fun(_Ev) -> changes_ev_done() end.
-
-continuous_changes(DataFun, UserFun) ->
- {DataFun2, _, Rest} = json_stream_parse:events(
- DataFun,
- fun(Ev) -> parse_changes_line(Ev, UserFun) end),
- continuous_changes(fun() -> {Rest, DataFun2} end, UserFun).
-
-parse_changes_line(object_start, UserFun) ->
- fun(Ev) ->
- json_stream_parse:collect_object(Ev,
- fun(Obj) -> UserFun(json_to_doc_info(Obj)) end)
- end.
-
-json_to_doc_info({Props}) ->
- RevsInfo = lists:map(
- fun({Change}) ->
- Rev = couch_doc:parse_rev(get_value(<<"rev">>, Change)),
- Del = (true =:= get_value(<<"deleted">>, Change)),
- #rev_info{rev=Rev, deleted=Del}
- end, get_value(<<"changes">>, Props)),
- #doc_info{
- id = get_value(<<"id">>, Props),
- high_seq = get_value(<<"seq">>, Props),
- revs = RevsInfo
- }.
-
-
-bulk_results_to_errors(Docs, {ok, Results}, interactive_edit) ->
- lists:reverse(lists:foldl(
- fun({_, {ok, _}}, Acc) ->
- Acc;
- ({#doc{id = Id, revs = {Pos, [RevId | _]}}, Error}, Acc) ->
- {_, Error, Reason} = couch_httpd:error_info(Error),
- [ {[{id, Id}, {rev, rev_to_str({Pos, RevId})},
- {error, Error}, {reason, Reason}]} | Acc ]
- end,
- [], lists:zip(Docs, Results)));
-
-bulk_results_to_errors(Docs, {ok, Results}, replicated_changes) ->
- bulk_results_to_errors(Docs, {aborted, Results}, interactive_edit);
-
-bulk_results_to_errors(_Docs, {aborted, Results}, interactive_edit) ->
- lists:map(
- fun({{Id, Rev}, Err}) ->
- {_, Error, Reason} = couch_httpd:error_info(Err),
- {[{id, Id}, {rev, rev_to_str(Rev)}, {error, Error}, {reason, Reason}]}
- end,
- Results);
-
-bulk_results_to_errors(_Docs, Results, remote) ->
- lists:reverse(lists:foldl(
- fun({Props}, Acc) ->
- case get_value(<<"error">>, Props, get_value(error, Props)) of
- undefined ->
- Acc;
- Error ->
- Id = get_value(<<"id">>, Props, get_value(id, Props)),
- Rev = get_value(<<"rev">>, Props, get_value(rev, Props)),
- Reason = get_value(<<"reason">>, Props, get_value(reason, Props)),
- [ {[{id, Id}, {rev, rev_to_str(Rev)},
- {error, Error}, {reason, Reason}]} | Acc ]
- end
- end,
- [], Results)).
-
-
-rev_to_str({_Pos, _Id} = Rev) ->
- couch_doc:rev_to_str(Rev);
-rev_to_str(Rev) ->
- Rev.
-
-write_fun() ->
- fun(Data) ->
- receive {get_data, Ref, From} ->
- From ! {data, Ref, Data}
- end
- end.
-
-stream_doc({JsonBytes, Atts, Boundary, Len}) ->
- case erlang:erase({doc_streamer, Boundary}) of
- Pid when is_pid(Pid) ->
- unlink(Pid),
- exit(Pid, kill);
- _ ->
- ok
- end,
- DocStreamer = spawn_link(
- couch_doc,
- doc_to_multi_part_stream,
- [Boundary, JsonBytes, Atts, write_fun(), true]
- ),
- erlang:put({doc_streamer, Boundary}, DocStreamer),
- {ok, <<>>, {Len, Boundary}};
-stream_doc({0, Id}) ->
- erlang:erase({doc_streamer, Id}),
- eof;
-stream_doc({LenLeft, Id}) when LenLeft > 0 ->
- Ref = make_ref(),
- erlang:get({doc_streamer, Id}) ! {get_data, Ref, self()},
- receive {data, Ref, Data} ->
- {ok, Data, {LenLeft - iolist_size(Data), Id}}
- end.
http://git-wip-us.apache.org/repos/asf/couchdb/blob/4c39d7a5/apps/couch_replicator/src/couch_replicator_api_wrap.hrl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator_api_wrap.hrl b/apps/couch_replicator/src/couch_replicator_api_wrap.hrl
deleted file mode 100644
index 1a6f27a..0000000
--- a/apps/couch_replicator/src/couch_replicator_api_wrap.hrl
+++ /dev/null
@@ -1,36 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
-
-
--record(httpdb, {
- url,
- oauth = nil,
- headers = [
- {"Accept", "application/json"},
- {"User-Agent", "CouchDB/" ++ couch_server:get_version()}
- ],
- timeout, % milliseconds
- ibrowse_options = [],
- retries = 10,
- wait = 250, % milliseconds
- httpc_pool = nil,
- http_connections
-}).
-
--record(oauth, {
- consumer_key,
- token,
- token_secret,
- consumer_secret,
- signature_method
-}).
http://git-wip-us.apache.org/repos/asf/couchdb/blob/4c39d7a5/apps/couch_replicator/src/couch_replicator_app.erl
----------------------------------------------------------------------
diff --git a/apps/couch_replicator/src/couch_replicator_app.erl b/apps/couch_replicator/src/couch_replicator_app.erl
deleted file mode 100644
index 4083db6..0000000
--- a/apps/couch_replicator/src/couch_replicator_app.erl
+++ /dev/null
@@ -1,29 +0,0 @@
-% Licensed under the Apache License, Version 2.0 (the "License"); you may not
-% use this file except in compliance with the License. You may obtain a copy of
-% the License at
-%
-% http://www.apache.org/licenses/LICENSE-2.0
-%
-% Unless required by applicable law or agreed to in writing, software
-% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-% License for the specific language governing permissions and limitations under
-% the License.
-
--module(couch_replicator_app).
-
--behaviour(application).
-
-%% Application callbacks
--export([start/2, stop/1]).
-
-%% ===================================================================
-%% Application callbacks
-%% ===================================================================
-
-start(_StartType, _StartArgs) ->
- couch_util:start_app_deps(couch_replicator),
- couch_replicator_sup:start_link().
-
-stop(_State) ->
- ok.