You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:15:45 UTC
[14/37] couch-replicator commit: updated refs/heads/master to aafb5f9
Report a correct value for changes_pending
This uses the work done in BugzId: 24236 that provides a new pending
field in _changes feeds. The source _changes is polled with a
`?since=seq&limit=0` query string to get the remaning changes to be
processed for a given sequence.
BugzId: 26015
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/756f500b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/756f500b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/756f500b
Branch: refs/heads/master
Commit: 756f500b3194ddf7b4ccbb9421024cec055f9ee3
Parents: 0c095c0
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Dec 10 15:43:38 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:04:05 2014 +0100
----------------------------------------------------------------------
src/couch_replicator.erl | 58 +++++++++++++---------------------
src/couch_replicator_api_wrap.erl | 15 +++++++++
2 files changed, 37 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/756f500b/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index ef44c54..8041283 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -240,8 +240,8 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
source_name = SourceName,
target_name = TargetName,
start_seq = {_Ts, StartSeq},
- source_seq = SourceCurSeq,
committed_seq = {_, CommittedSeq},
+ highest_seq_done = {_, HighestSeq},
checkpoint_interval = CheckpointInterval
} = State = init_state(Rep),
@@ -283,9 +283,9 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
{missing_revisions_found, 0},
{docs_read, 0},
{docs_written, 0},
- {changes_pending, pending(SourceCurSeq, CommittedSeq)},
+ {changes_pending, get_pending_count(State)},
{doc_write_failures, 0},
- {source_seq, SourceCurSeq},
+ {source_seq, HighestSeq},
{checkpointed_source_seq, CommittedSeq},
{progress, 0},
{checkpoint_interval, CheckpointInterval}
@@ -431,13 +431,11 @@ handle_call({report_seq_done, Seq, StatsInc}, From,
"Seqs in progress were: ~p~nSeqs in progress are now: ~p",
[Seq, ThroughSeq, NewThroughSeq, HighestDone,
NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
- SourceCurSeq = source_cur_seq(State),
NewState = State#rep_state{
stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
current_through_seq = NewThroughSeq,
seqs_in_progress = NewSeqsInProgress,
- highest_seq_done = NewHighestDone,
- source_seq = SourceCurSeq
+ highest_seq_done = NewHighestDone
},
update_task(NewState),
{noreply, NewState}.
@@ -589,7 +587,6 @@ init_state(Rep) ->
start_db_compaction_notifier(Target, self()),
source_monitor = db_monitor(Source),
target_monitor = db_monitor(Target),
- source_seq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
use_checkpoints = get_value(use_checkpoints, Options, true),
checkpoint_interval = get_value(checkpoint_interval, Options, 5000)
},
@@ -650,10 +647,8 @@ do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
{ok, NewState};
do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
- SourceCurSeq = source_cur_seq(State),
- NewState = State#rep_state{source_seq = SourceCurSeq},
- update_task(NewState),
- {ok, NewState};
+ update_task(State),
+ {ok, State};
do_checkpoint(State) ->
#rep_state{
source_name=SourceName,
@@ -727,9 +722,7 @@ do_checkpoint(State) ->
Source, SourceLog#doc{body = NewRepHistory}, source),
{TgtRevPos, TgtRevId} = update_checkpoint(
Target, TargetLog#doc{body = NewRepHistory}, target),
- SourceCurSeq = source_cur_seq(State),
NewState = State#rep_state{
- source_seq = SourceCurSeq,
checkpoint_history = NewRepHistory,
committed_seq = NewTsSeq,
source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
@@ -882,54 +875,47 @@ db_monitor(_HttpDb) ->
nil.
-source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq}) ->
- case (catch couch_replicator_api_wrap:get_db_info(Db#httpdb{retries = 3})) of
- {ok, Info} ->
- get_value(<<"update_seq">>, Info, Seq);
+get_pending_count(#rep_state{source = #httpdb{} = Db0}=St) ->
+ {_, Seq} = St#rep_state.highest_seq_done,
+ Db = Db0#httpdb{retries = 3},
+ case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of
+ {ok, Pending} ->
+ Pending;
_ ->
- Seq
+ null
end;
-source_cur_seq(#rep_state{source = Db, source_seq = Seq}) ->
- {ok, Info} = couch_replicator_api_wrap:get_db_info(Db),
- get_value(<<"update_seq">>, Info, Seq).
+get_pending_count(#rep_state{source = Db}=St) ->
+ {_, Seq} = St#rep_state.highest_seq_done,
+ {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq),
+ Pending.
update_task(State) ->
#rep_state{
- current_through_seq = {_, CurSeq},
- source_seq = SourceCurSeq
+ highest_seq_done = {_, HighestSeq}
} = State,
couch_task_status:update(
rep_stats(State) ++ [
- {source_seq, SourceCurSeq},
- case {unpack_seq(CurSeq), unpack_seq(SourceCurSeq)} of
- {_, 0} ->
- {progress, 0};
- {CurSeq1, SourceCurSeq1} ->
- {progress, (CurSeq1 * 100) div SourceCurSeq1}
- end
+ {source_seq, HighestSeq},
+ {progress, 0}
]).
rep_stats(State) ->
#rep_state{
committed_seq = {_, CommittedSeq},
- stats = Stats,
- source_seq = SourceCurSeq
+ stats = Stats
} = State,
[
{revisions_checked, couch_replicator_stats:missing_checked(Stats)},
{missing_revisions_found, couch_replicator_stats:missing_found(Stats)},
{docs_read, couch_replicator_stats:docs_read(Stats)},
{docs_written, couch_replicator_stats:docs_written(Stats)},
- {changes_pending, pending(SourceCurSeq, CommittedSeq)},
+ {changes_pending, get_pending_count(State)},
{doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
{checkpointed_source_seq, CommittedSeq}
].
-pending(SourceCurSeq, CommittedSeq) ->
- unpack_seq(SourceCurSeq) - unpack_seq(CommittedSeq).
-
unpack_seq(Seq) when is_number(Seq) ->
Seq;
unpack_seq([SeqNum, _]) ->
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/756f500b/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index fa64377..3b2e1de 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -26,6 +26,7 @@
db_open/3,
db_close/1,
get_db_info/1,
+ get_pending_count/2,
update_doc/3,
update_doc/4,
update_docs/3,
@@ -124,6 +125,20 @@ get_db_info(#db{name = DbName, user_ctx = UserCtx}) ->
{ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
+get_pending_count(#httpdb{} = Db, Seq) ->
+ send_req(
+ Db,
+ [{path, "_changes"}, {qs, [{"since", Seq}, {"limit", "0"}]}],
+ fun(200, _, {Props}) ->
+ {ok, couch_util:get_value(<<"pending">>, Props, null)}
+ end);
+get_pending_count(#db{name=DbName, user_ctx = UserCtx}, Seq) ->
+ {ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
+ Pending = couch_db:count_changes_since(Db, Seq),
+ couch_db:close(Db),
+ {ok, Pending}.
+
+
ensure_full_commit(#httpdb{} = Db) ->
send_req(
Db,