You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:15:45 UTC

[14/37] couch-replicator commit: updated refs/heads/master to aafb5f9

Report a correct value for changes_pending

This uses the work done in BugzId: 24236 that provides a new pending
field in _changes feeds. The source _changes is polled with a
`?since=seq&limit=0` query string to get the remaning changes to be
processed for a given sequence.

BugzId: 26015


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/756f500b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/756f500b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/756f500b

Branch: refs/heads/master
Commit: 756f500b3194ddf7b4ccbb9421024cec055f9ee3
Parents: 0c095c0
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Dec 10 15:43:38 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 15:04:05 2014 +0100

----------------------------------------------------------------------
 src/couch_replicator.erl          | 58 +++++++++++++---------------------
 src/couch_replicator_api_wrap.erl | 15 +++++++++
 2 files changed, 37 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/756f500b/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index ef44c54..8041283 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -240,8 +240,8 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
         source_name = SourceName,
         target_name = TargetName,
         start_seq = {_Ts, StartSeq},
-        source_seq = SourceCurSeq,
         committed_seq = {_, CommittedSeq},
+        highest_seq_done = {_, HighestSeq},
         checkpoint_interval = CheckpointInterval
     } = State = init_state(Rep),
 
@@ -283,9 +283,9 @@ do_init(#rep{options = Options, id = {BaseId, Ext}, user_ctx=UserCtx} = Rep) ->
         {missing_revisions_found, 0},
         {docs_read, 0},
         {docs_written, 0},
-        {changes_pending, pending(SourceCurSeq, CommittedSeq)},
+        {changes_pending, get_pending_count(State)},
         {doc_write_failures, 0},
-        {source_seq, SourceCurSeq},
+        {source_seq, HighestSeq},
         {checkpointed_source_seq, CommittedSeq},
         {progress, 0},
         {checkpoint_interval, CheckpointInterval}
@@ -431,13 +431,11 @@ handle_call({report_seq_done, Seq, StatsInc}, From,
         "Seqs in progress were: ~p~nSeqs in progress are now: ~p",
         [Seq, ThroughSeq, NewThroughSeq, HighestDone,
             NewHighestDone, SeqsInProgress, NewSeqsInProgress]),
-    SourceCurSeq = source_cur_seq(State),
     NewState = State#rep_state{
         stats = couch_replicator_utils:sum_stats(Stats, StatsInc),
         current_through_seq = NewThroughSeq,
         seqs_in_progress = NewSeqsInProgress,
-        highest_seq_done = NewHighestDone,
-        source_seq = SourceCurSeq
+        highest_seq_done = NewHighestDone
     },
     update_task(NewState),
     {noreply, NewState}.
@@ -589,7 +587,6 @@ init_state(Rep) ->
             start_db_compaction_notifier(Target, self()),
         source_monitor = db_monitor(Source),
         target_monitor = db_monitor(Target),
-        source_seq = get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ),
         use_checkpoints = get_value(use_checkpoints, Options, true),
         checkpoint_interval = get_value(checkpoint_interval, Options, 5000)
     },
@@ -650,10 +647,8 @@ do_checkpoint(#rep_state{use_checkpoints=false} = State) ->
     NewState = State#rep_state{checkpoint_history = {[{<<"use_checkpoints">>, false}]} },
     {ok, NewState};
 do_checkpoint(#rep_state{current_through_seq=Seq, committed_seq=Seq} = State) ->
-    SourceCurSeq = source_cur_seq(State),
-    NewState = State#rep_state{source_seq = SourceCurSeq},
-    update_task(NewState),
-    {ok, NewState};
+    update_task(State),
+    {ok, State};
 do_checkpoint(State) ->
     #rep_state{
         source_name=SourceName,
@@ -727,9 +722,7 @@ do_checkpoint(State) ->
                 Source, SourceLog#doc{body = NewRepHistory}, source),
             {TgtRevPos, TgtRevId} = update_checkpoint(
                 Target, TargetLog#doc{body = NewRepHistory}, target),
-            SourceCurSeq = source_cur_seq(State),
             NewState = State#rep_state{
-                source_seq = SourceCurSeq,
                 checkpoint_history = NewRepHistory,
                 committed_seq = NewTsSeq,
                 source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
@@ -882,54 +875,47 @@ db_monitor(_HttpDb) ->
     nil.
 
 
-source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq}) ->
-    case (catch couch_replicator_api_wrap:get_db_info(Db#httpdb{retries = 3})) of
-    {ok, Info} ->
-        get_value(<<"update_seq">>, Info, Seq);
+get_pending_count(#rep_state{source = #httpdb{} = Db0}=St) ->
+    {_, Seq} = St#rep_state.highest_seq_done,
+    Db = Db0#httpdb{retries = 3},
+    case (catch couch_replicator_api_wrap:get_pending_count(Db, Seq)) of
+    {ok, Pending} ->
+        Pending;
     _ ->
-        Seq
+        null
     end;
-source_cur_seq(#rep_state{source = Db, source_seq = Seq}) ->
-    {ok, Info} = couch_replicator_api_wrap:get_db_info(Db),
-    get_value(<<"update_seq">>, Info, Seq).
+get_pending_count(#rep_state{source = Db}=St) ->
+    {_, Seq} = St#rep_state.highest_seq_done,
+    {ok, Pending} = couch_replicator_api_wrap:get_pending_count(Db, Seq),
+    Pending.
 
 
 update_task(State) ->
     #rep_state{
-        current_through_seq = {_, CurSeq},
-        source_seq = SourceCurSeq
+        highest_seq_done = {_, HighestSeq}
     } = State,
     couch_task_status:update(
         rep_stats(State) ++ [
-        {source_seq, SourceCurSeq},
-        case {unpack_seq(CurSeq), unpack_seq(SourceCurSeq)} of
-            {_, 0} ->
-                {progress, 0};
-            {CurSeq1, SourceCurSeq1} ->
-                {progress, (CurSeq1 * 100) div SourceCurSeq1}
-        end
+        {source_seq, HighestSeq},
+        {progress, 0}
     ]).
 
 
 rep_stats(State) ->
     #rep_state{
         committed_seq = {_, CommittedSeq},
-        stats = Stats,
-        source_seq = SourceCurSeq
+        stats = Stats
     } = State,
     [
         {revisions_checked, couch_replicator_stats:missing_checked(Stats)},
         {missing_revisions_found, couch_replicator_stats:missing_found(Stats)},
         {docs_read, couch_replicator_stats:docs_read(Stats)},
         {docs_written, couch_replicator_stats:docs_written(Stats)},
-        {changes_pending, pending(SourceCurSeq, CommittedSeq)},
+        {changes_pending, get_pending_count(State)},
         {doc_write_failures, couch_replicator_stats:doc_write_failures(Stats)},
         {checkpointed_source_seq, CommittedSeq}
     ].
 
-pending(SourceCurSeq, CommittedSeq) ->
-    unpack_seq(SourceCurSeq) - unpack_seq(CommittedSeq).
-
 unpack_seq(Seq) when is_number(Seq) ->
     Seq;
 unpack_seq([SeqNum, _]) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/756f500b/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index fa64377..3b2e1de 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -26,6 +26,7 @@
     db_open/3,
     db_close/1,
     get_db_info/1,
+    get_pending_count/2,
     update_doc/3,
     update_doc/4,
     update_docs/3,
@@ -124,6 +125,20 @@ get_db_info(#db{name = DbName, user_ctx = UserCtx}) ->
     {ok, [{couch_util:to_binary(K), V} || {K, V} <- Info]}.
 
 
+get_pending_count(#httpdb{} = Db, Seq) ->
+    send_req(
+        Db,
+        [{path, "_changes"}, {qs, [{"since", Seq}, {"limit", "0"}]}],
+        fun(200, _, {Props}) ->
+            {ok, couch_util:get_value(<<"pending">>, Props, null)}
+        end);
+get_pending_count(#db{name=DbName, user_ctx = UserCtx}, Seq) ->
+    {ok, Db} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
+    Pending = couch_db:count_changes_since(Db, Seq),
+    couch_db:close(Db),
+    {ok, Pending}.
+
+
 ensure_full_commit(#httpdb{} = Db) ->
     send_req(
         Db,