You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2011/07/08 20:38:44 UTC

svn commit: r1144419 - in /couchdb/trunk/src/couchdb: couch_replicator.erl couch_replicator_doc_copier.erl couch_replicator_rev_finder.erl

Author: fdmanana
Date: Fri Jul  8 18:38:44 2011
New Revision: 1144419

URL: http://svn.apache.org/viewvc?rev=1144419&view=rev
Log:
Make replicator agnostic about the update seq type

To determine which row, coming from a _changes feed, came
earlier (or later), the replicator compared their respective
update seq field which is a number in Apache CouchDB. This
worked fine, as rows coming from the _changes stream are
sorted by they're update seq field. However it didn't work
when the update seq is not a number, such as in the case of
BigCouch for example (strings).

This change allows for pull replicating from a BigCouch cluster,
simplifies some code and makes it more robust overall.


Modified:
    couchdb/trunk/src/couchdb/couch_replicator.erl
    couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl
    couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl

Modified: couchdb/trunk/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1144419&r1=1144418&r2=1144419&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator.erl Fri Jul  8 18:38:44 2011
@@ -591,16 +591,22 @@ spawn_changes_reader(StartSeq, #httpdb{}
     spawn_link(fun() ->
         put(last_seq, StartSeq),
         put(retries_left, Db#httpdb.retries),
+        put(row_ts, 1),
         read_changes(StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options)
     end);
 spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) ->
-    spawn_link(fun() -> read_changes(StartSeq, Db, ChangesQueue, Options) end).
+    spawn_link(fun() ->
+        put(row_ts, 1),
+        read_changes(StartSeq, Db, ChangesQueue, Options)
+    end).
 
 read_changes(StartSeq, Db, ChangesQueue, Options) ->
     try
         couch_api_wrap:changes_since(Db, all_docs, StartSeq,
             fun(#doc_info{high_seq = Seq} = DocInfo) ->
-                ok = couch_work_queue:queue(ChangesQueue, DocInfo),
+                Ts = get(row_ts),
+                ok = couch_work_queue:queue(ChangesQueue, {Ts, DocInfo}),
+                put(row_ts, Ts + 1),
                 put(last_seq, Seq)
             end, Options),
         couch_work_queue:close(ChangesQueue)
@@ -641,7 +647,7 @@ do_checkpoint(State) ->
         target = Target,
         history = OldHistory,
         start_seq = StartSeq,
-        current_through_seq = NewSeq,
+        current_through_seq = {_Ts, NewSeq} = NewTsSeq,
         source_log = SourceLog,
         target_log = TargetLog,
         rep_starttime = ReplicationStartTime,
@@ -708,7 +714,7 @@ do_checkpoint(State) ->
                 Target, TargetLog#doc{body = NewRepHistory}, target),
             NewState = State#rep_state{
                 checkpoint_history = NewRepHistory,
-                committed_seq = NewSeq,
+                committed_seq = NewTsSeq,
                 source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
                 target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
             },

Modified: couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl?rev=1144419&r1=1144418&r2=1144419&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl Fri Jul  8 18:38:44 2011
@@ -50,7 +50,6 @@
 
 -record(state, {
     loop,
-    cp,
     max_parallel_conns,
     source,
     target,
@@ -58,7 +57,6 @@
     writer = nil,
     pending_fetch = nil,
     flush_waiter = nil,
-    highest_seq_seen = ?LOWEST_SEQ,
     stats = #rep_stats{},
     source_db_compaction_notifier = nil,
     target_db_compaction_notifier = nil,
@@ -69,7 +67,7 @@
 
 start_link(Cp, #db{} = Source, Target, MissingRevsQueue, _MaxConns) ->
     Pid = spawn_link(
-        fun() -> queue_fetch_loop(Source, Target, Cp, MissingRevsQueue) end),
+        fun() -> queue_fetch_loop(Source, Target, Cp, Cp, MissingRevsQueue) end),
     {ok, Pid};
 
 start_link(Cp, Source, Target, MissingRevsQueue, MaxConns) ->
@@ -81,10 +79,9 @@ init({Cp, Source, Target, MissingRevsQue
     process_flag(trap_exit, true),
     Parent = self(),
     LoopPid = spawn_link(
-        fun() -> queue_fetch_loop(Source, Target, Parent, MissingRevsQueue) end
+        fun() -> queue_fetch_loop(Source, Target, Parent, Cp, MissingRevsQueue) end
     ),
     State = #state{
-        cp = Cp,
         max_parallel_conns = MaxConns,
         loop = LoopPid,
         source = open_db(Source),
@@ -97,19 +94,18 @@ init({Cp, Source, Target, MissingRevsQue
     {ok, State}.
 
 
-handle_call({seq_done, Seq, RevCount}, {Pid, _},
-    #state{loop = Pid, highest_seq_seen = HighSeq, stats = Stats} = State) ->
+handle_call({seq_done, _Seq, RevCount}, {Pid, _},
+    #state{loop = Pid, stats = Stats} = State) ->
     NewState = State#state{
-        highest_seq_seen = lists:max([Seq, HighSeq]),
         stats = Stats#rep_stats{
             missing_checked = Stats#rep_stats.missing_checked + RevCount
         }
     },
     {reply, ok, NewState};
 
-handle_call({fetch_doc, {_Id, Revs, _PAs, Seq} = Params}, {Pid, _} = From,
+handle_call({fetch_doc, {_Id, Revs, _PAs, _Seq} = Params}, {Pid, _} = From,
     #state{loop = Pid, readers = Readers, pending_fetch = nil,
-        highest_seq_seen = HighSeq, stats = Stats, source = Src, target = Tgt,
+        stats = Stats, source = Src, target = Tgt,
         max_parallel_conns = MaxConns} = State) ->
     Stats2 = Stats#rep_stats{
         missing_checked = Stats#rep_stats.missing_checked + length(Revs),
@@ -119,14 +115,12 @@ handle_call({fetch_doc, {_Id, Revs, _PAs
     Size when Size < MaxConns ->
         Reader = spawn_doc_reader(Src, Tgt, Params),
         NewState = State#state{
-            highest_seq_seen = lists:max([Seq, HighSeq]),
             stats = Stats2,
             readers = [Reader | Readers]
         },
         {reply, ok, NewState};
     _ ->
         NewState = State#state{
-            highest_seq_seen = lists:max([Seq, HighSeq]),
             stats = Stats2,
             pending_fetch = {From, Params}
         },
@@ -243,33 +237,33 @@ code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
 
-queue_fetch_loop(Source, Target, Parent, MissingRevsQueue) ->
+queue_fetch_loop(Source, Target, Parent, Cp, MissingRevsQueue) ->
     case couch_work_queue:dequeue(MissingRevsQueue, 1) of
     closed ->
         ok;
-    {ok, [IdRevs]} ->
+    {ok, [{ReportSeq, IdRevs}]} ->
         case Source of
         #db{} ->
             Source2 = open_db(Source),
             Target2 = open_db(Target),
-            {Stats, HighSeqDone} = local_process_batch(
-                IdRevs, Source2, Target2, #batch{}, #rep_stats{}, ?LOWEST_SEQ),
+            Stats = local_process_batch(
+                IdRevs, Source2, Target2, #batch{}, #rep_stats{}),
             close_db(Source2),
-            close_db(Target2),
-            ok = gen_server:cast(Parent, {report_seq_done, HighSeqDone, Stats}),
-            ?LOG_DEBUG("Worker reported completion of seq ~p", [HighSeqDone]);
+            close_db(Target2);
         #httpdb{} ->
-            remote_process_batch(IdRevs, Parent)
+            remote_process_batch(IdRevs, Parent),
+            {ok, Stats} = gen_server:call(Parent, flush, infinity)
         end,
-        queue_fetch_loop(Source, Target, Parent, MissingRevsQueue)
+        ok = gen_server:cast(Cp, {report_seq_done, ReportSeq, Stats}),
+        ?LOG_DEBUG("Worker reported completion of seq ~p", [ReportSeq]),
+        queue_fetch_loop(Source, Target, Parent, Cp, MissingRevsQueue)
     end.
 
 
-local_process_batch([], _Src, _Tgt, #batch{docs = []}, Stats, HighestSeqDone) ->
-    {Stats, HighestSeqDone};
+local_process_batch([], _Src, _Tgt, #batch{docs = []}, Stats) ->
+    Stats;
 
-local_process_batch([], _Source, Target, #batch{docs = Docs, size = Size},
-    Stats, HighestSeqDone) ->
+local_process_batch([], _Source, Target, #batch{docs = Docs, size = Size}, Stats) ->
     case Target of
     #httpdb{} ->
         ?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]);
@@ -277,14 +271,13 @@ local_process_batch([], _Source, Target,
         ?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size])
     end,
     {Written, WriteFailures} = flush_docs(Target, Docs),
-    Stats2 = Stats#rep_stats{
+    Stats#rep_stats{
         docs_written = Stats#rep_stats.docs_written + Written,
         doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures
-    },
-    {Stats2, HighestSeqDone};
+    };
 
 local_process_batch([{Seq, {Id, Revs, NotMissingCount, PAs}} | Rest],
-    Source, Target, Batch, Stats, HighestSeqSeen) ->
+    Source, Target, Batch, Stats) ->
     {ok, {_, DocList, Written0, WriteFailures0}} = fetch_doc(
         Source, {Id, Revs, PAs, Seq}, fun local_doc_handler/2,
         {Target, [], 0, 0}),
@@ -303,12 +296,11 @@ local_process_batch([{Seq, {Id, Revs, No
         docs_written = Stats#rep_stats.docs_written + Written,
         doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures
     },
-    local_process_batch(
-        Rest, Source, Target, Batch2, Stats2, lists:max([Seq, HighestSeqSeen])).
+    local_process_batch(Rest, Source, Target, Batch2, Stats2).
 
 
-remote_process_batch([], Parent) ->
-    ok = gen_server:call(Parent, flush, infinity);
+remote_process_batch([], _Parent) ->
+    ok;
 
 remote_process_batch([{Seq, {Id, Revs, NotMissing, PAs}} | Rest], Parent) ->
     case NotMissing > 0 of
@@ -416,17 +408,13 @@ spawn_writer(Target, #batch{docs = DocLi
         end).
 
 
-after_full_flush(#state{cp = Cp, stats = Stats, flush_waiter = Waiter,
-        highest_seq_seen = HighSeqDone} = State) ->
-    ok = gen_server:cast(Cp, {report_seq_done, HighSeqDone, Stats}),
-    ?LOG_DEBUG("Worker reported completion of seq ~p", [HighSeqDone]),
-    gen_server:reply(Waiter, ok),
+after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) ->
+    gen_server:reply(Waiter, {ok, Stats}),
     State#state{
         stats = #rep_stats{},
         flush_waiter = nil,
         writer = nil,
-        batch = #batch{},
-        highest_seq_seen = ?LOWEST_SEQ
+        batch = #batch{}
     }.
 
 

Modified: couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl?rev=1144419&r1=1144418&r2=1144419&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl Fri Jul  8 18:38:44 2011
@@ -35,23 +35,24 @@ missing_revs_finder_loop(Cp, Target, Cha
     closed ->
         ok;
     {ok, DocInfos} ->
-        #doc_info{high_seq = ReportSeq} = lists:last(DocInfos),
+        {Ts, #doc_info{high_seq = Seq}} = lists:last(DocInfos),
+        ReportSeq = {Ts, Seq},
         ok = gen_server:cast(Cp, {report_seq, ReportSeq}),
         ?LOG_DEBUG("Missing revs finder defined report seq ~p", [ReportSeq]),
         IdRevs = [{Id, [Rev || #rev_info{rev = Rev} <- RevsInfo]} ||
-                    #doc_info{id = Id, revs = RevsInfo} <- DocInfos],
+                    {_, #doc_info{id = Id, revs = RevsInfo}} <- DocInfos],
         Target2 = open_db(Target),
         {ok, Missing} = couch_api_wrap:get_missing_revs(Target2, IdRevs),
         close_db(Target2),
-        queue_missing_revs(Missing, DocInfos, RevsQueue),
+        queue_missing_revs(Missing, DocInfos, ReportSeq, RevsQueue),
         missing_revs_finder_loop(Cp, Target2, ChangesQueue, RevsQueue, BatchSize)
     end.
 
 
-queue_missing_revs(Missing, DocInfos, Queue) ->
+queue_missing_revs(Missing, DocInfos, ReportSeq, Queue) ->
     IdRevsSeqDict = dict:from_list(
         [{Id, {[Rev || #rev_info{rev = Rev} <- RevsInfo], Seq}} ||
-            #doc_info{id = Id, revs = RevsInfo, high_seq = Seq} <- DocInfos]),
+            {_, #doc_info{id = Id, revs = RevsInfo, high_seq = Seq}} <- DocInfos]),
     AllDict = lists:foldl(
         fun({Id, MissingRevs, PAs}, Acc) ->
             {_, Seq} = dict:fetch(Id, IdRevsSeqDict),
@@ -71,7 +72,7 @@ queue_missing_revs(Missing, DocInfos, Qu
         AllDict, non_missing(IdRevsSeqDict, Missing)),
     ?LOG_DEBUG("Missing revs finder adding batch of ~p IdRevs to work queue",
         [dict:size(AllDict2)]),
-    ok = couch_work_queue:queue(Queue, dict:to_list(AllDict2)).
+    ok = couch_work_queue:queue(Queue, {ReportSeq, dict:to_list(AllDict2)}).
 
 
 non_missing(NonMissingDict, []) ->