You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2011/07/08 20:38:44 UTC
svn commit: r1144419 - in /couchdb/trunk/src/couchdb: couch_replicator.erl
couch_replicator_doc_copier.erl couch_replicator_rev_finder.erl
Author: fdmanana
Date: Fri Jul 8 18:38:44 2011
New Revision: 1144419
URL: http://svn.apache.org/viewvc?rev=1144419&view=rev
Log:
Make replicator agnostic about the update seq type
To determine which row, coming from a _changes feed, came
earlier (or later), the replicator compared their respective
update seq field which is a number in Apache CouchDB. This
worked fine, as rows coming from the _changes stream are
sorted by they're update seq field. However it didn't work
when the update seq is not a number, such as in the case of
BigCouch for example (strings).
This change allows for pull replicating from a BigCouch cluster,
simplifies some code and makes it more robust overall.
Modified:
couchdb/trunk/src/couchdb/couch_replicator.erl
couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl
couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl
Modified: couchdb/trunk/src/couchdb/couch_replicator.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator.erl?rev=1144419&r1=1144418&r2=1144419&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator.erl Fri Jul 8 18:38:44 2011
@@ -591,16 +591,22 @@ spawn_changes_reader(StartSeq, #httpdb{}
spawn_link(fun() ->
put(last_seq, StartSeq),
put(retries_left, Db#httpdb.retries),
+ put(row_ts, 1),
read_changes(StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options)
end);
spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) ->
- spawn_link(fun() -> read_changes(StartSeq, Db, ChangesQueue, Options) end).
+ spawn_link(fun() ->
+ put(row_ts, 1),
+ read_changes(StartSeq, Db, ChangesQueue, Options)
+ end).
read_changes(StartSeq, Db, ChangesQueue, Options) ->
try
couch_api_wrap:changes_since(Db, all_docs, StartSeq,
fun(#doc_info{high_seq = Seq} = DocInfo) ->
- ok = couch_work_queue:queue(ChangesQueue, DocInfo),
+ Ts = get(row_ts),
+ ok = couch_work_queue:queue(ChangesQueue, {Ts, DocInfo}),
+ put(row_ts, Ts + 1),
put(last_seq, Seq)
end, Options),
couch_work_queue:close(ChangesQueue)
@@ -641,7 +647,7 @@ do_checkpoint(State) ->
target = Target,
history = OldHistory,
start_seq = StartSeq,
- current_through_seq = NewSeq,
+ current_through_seq = {_Ts, NewSeq} = NewTsSeq,
source_log = SourceLog,
target_log = TargetLog,
rep_starttime = ReplicationStartTime,
@@ -708,7 +714,7 @@ do_checkpoint(State) ->
Target, TargetLog#doc{body = NewRepHistory}, target),
NewState = State#rep_state{
checkpoint_history = NewRepHistory,
- committed_seq = NewSeq,
+ committed_seq = NewTsSeq,
source_log = SourceLog#doc{revs={SrcRevPos, [SrcRevId]}},
target_log = TargetLog#doc{revs={TgtRevPos, [TgtRevId]}}
},
Modified: couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl?rev=1144419&r1=1144418&r2=1144419&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator_doc_copier.erl Fri Jul 8 18:38:44 2011
@@ -50,7 +50,6 @@
-record(state, {
loop,
- cp,
max_parallel_conns,
source,
target,
@@ -58,7 +57,6 @@
writer = nil,
pending_fetch = nil,
flush_waiter = nil,
- highest_seq_seen = ?LOWEST_SEQ,
stats = #rep_stats{},
source_db_compaction_notifier = nil,
target_db_compaction_notifier = nil,
@@ -69,7 +67,7 @@
start_link(Cp, #db{} = Source, Target, MissingRevsQueue, _MaxConns) ->
Pid = spawn_link(
- fun() -> queue_fetch_loop(Source, Target, Cp, MissingRevsQueue) end),
+ fun() -> queue_fetch_loop(Source, Target, Cp, Cp, MissingRevsQueue) end),
{ok, Pid};
start_link(Cp, Source, Target, MissingRevsQueue, MaxConns) ->
@@ -81,10 +79,9 @@ init({Cp, Source, Target, MissingRevsQue
process_flag(trap_exit, true),
Parent = self(),
LoopPid = spawn_link(
- fun() -> queue_fetch_loop(Source, Target, Parent, MissingRevsQueue) end
+ fun() -> queue_fetch_loop(Source, Target, Parent, Cp, MissingRevsQueue) end
),
State = #state{
- cp = Cp,
max_parallel_conns = MaxConns,
loop = LoopPid,
source = open_db(Source),
@@ -97,19 +94,18 @@ init({Cp, Source, Target, MissingRevsQue
{ok, State}.
-handle_call({seq_done, Seq, RevCount}, {Pid, _},
- #state{loop = Pid, highest_seq_seen = HighSeq, stats = Stats} = State) ->
+handle_call({seq_done, _Seq, RevCount}, {Pid, _},
+ #state{loop = Pid, stats = Stats} = State) ->
NewState = State#state{
- highest_seq_seen = lists:max([Seq, HighSeq]),
stats = Stats#rep_stats{
missing_checked = Stats#rep_stats.missing_checked + RevCount
}
},
{reply, ok, NewState};
-handle_call({fetch_doc, {_Id, Revs, _PAs, Seq} = Params}, {Pid, _} = From,
+handle_call({fetch_doc, {_Id, Revs, _PAs, _Seq} = Params}, {Pid, _} = From,
#state{loop = Pid, readers = Readers, pending_fetch = nil,
- highest_seq_seen = HighSeq, stats = Stats, source = Src, target = Tgt,
+ stats = Stats, source = Src, target = Tgt,
max_parallel_conns = MaxConns} = State) ->
Stats2 = Stats#rep_stats{
missing_checked = Stats#rep_stats.missing_checked + length(Revs),
@@ -119,14 +115,12 @@ handle_call({fetch_doc, {_Id, Revs, _PAs
Size when Size < MaxConns ->
Reader = spawn_doc_reader(Src, Tgt, Params),
NewState = State#state{
- highest_seq_seen = lists:max([Seq, HighSeq]),
stats = Stats2,
readers = [Reader | Readers]
},
{reply, ok, NewState};
_ ->
NewState = State#state{
- highest_seq_seen = lists:max([Seq, HighSeq]),
stats = Stats2,
pending_fetch = {From, Params}
},
@@ -243,33 +237,33 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-queue_fetch_loop(Source, Target, Parent, MissingRevsQueue) ->
+queue_fetch_loop(Source, Target, Parent, Cp, MissingRevsQueue) ->
case couch_work_queue:dequeue(MissingRevsQueue, 1) of
closed ->
ok;
- {ok, [IdRevs]} ->
+ {ok, [{ReportSeq, IdRevs}]} ->
case Source of
#db{} ->
Source2 = open_db(Source),
Target2 = open_db(Target),
- {Stats, HighSeqDone} = local_process_batch(
- IdRevs, Source2, Target2, #batch{}, #rep_stats{}, ?LOWEST_SEQ),
+ Stats = local_process_batch(
+ IdRevs, Source2, Target2, #batch{}, #rep_stats{}),
close_db(Source2),
- close_db(Target2),
- ok = gen_server:cast(Parent, {report_seq_done, HighSeqDone, Stats}),
- ?LOG_DEBUG("Worker reported completion of seq ~p", [HighSeqDone]);
+ close_db(Target2);
#httpdb{} ->
- remote_process_batch(IdRevs, Parent)
+ remote_process_batch(IdRevs, Parent),
+ {ok, Stats} = gen_server:call(Parent, flush, infinity)
end,
- queue_fetch_loop(Source, Target, Parent, MissingRevsQueue)
+ ok = gen_server:cast(Cp, {report_seq_done, ReportSeq, Stats}),
+ ?LOG_DEBUG("Worker reported completion of seq ~p", [ReportSeq]),
+ queue_fetch_loop(Source, Target, Parent, Cp, MissingRevsQueue)
end.
-local_process_batch([], _Src, _Tgt, #batch{docs = []}, Stats, HighestSeqDone) ->
- {Stats, HighestSeqDone};
+local_process_batch([], _Src, _Tgt, #batch{docs = []}, Stats) ->
+ Stats;
-local_process_batch([], _Source, Target, #batch{docs = Docs, size = Size},
- Stats, HighestSeqDone) ->
+local_process_batch([], _Source, Target, #batch{docs = Docs, size = Size}, Stats) ->
case Target of
#httpdb{} ->
?LOG_DEBUG("Worker flushing doc batch of size ~p bytes", [Size]);
@@ -277,14 +271,13 @@ local_process_batch([], _Source, Target,
?LOG_DEBUG("Worker flushing doc batch of ~p docs", [Size])
end,
{Written, WriteFailures} = flush_docs(Target, Docs),
- Stats2 = Stats#rep_stats{
+ Stats#rep_stats{
docs_written = Stats#rep_stats.docs_written + Written,
doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures
- },
- {Stats2, HighestSeqDone};
+ };
local_process_batch([{Seq, {Id, Revs, NotMissingCount, PAs}} | Rest],
- Source, Target, Batch, Stats, HighestSeqSeen) ->
+ Source, Target, Batch, Stats) ->
{ok, {_, DocList, Written0, WriteFailures0}} = fetch_doc(
Source, {Id, Revs, PAs, Seq}, fun local_doc_handler/2,
{Target, [], 0, 0}),
@@ -303,12 +296,11 @@ local_process_batch([{Seq, {Id, Revs, No
docs_written = Stats#rep_stats.docs_written + Written,
doc_write_failures = Stats#rep_stats.doc_write_failures + WriteFailures
},
- local_process_batch(
- Rest, Source, Target, Batch2, Stats2, lists:max([Seq, HighestSeqSeen])).
+ local_process_batch(Rest, Source, Target, Batch2, Stats2).
-remote_process_batch([], Parent) ->
- ok = gen_server:call(Parent, flush, infinity);
+remote_process_batch([], _Parent) ->
+ ok;
remote_process_batch([{Seq, {Id, Revs, NotMissing, PAs}} | Rest], Parent) ->
case NotMissing > 0 of
@@ -416,17 +408,13 @@ spawn_writer(Target, #batch{docs = DocLi
end).
-after_full_flush(#state{cp = Cp, stats = Stats, flush_waiter = Waiter,
- highest_seq_seen = HighSeqDone} = State) ->
- ok = gen_server:cast(Cp, {report_seq_done, HighSeqDone, Stats}),
- ?LOG_DEBUG("Worker reported completion of seq ~p", [HighSeqDone]),
- gen_server:reply(Waiter, ok),
+after_full_flush(#state{stats = Stats, flush_waiter = Waiter} = State) ->
+ gen_server:reply(Waiter, {ok, Stats}),
State#state{
stats = #rep_stats{},
flush_waiter = nil,
writer = nil,
- batch = #batch{},
- highest_seq_seen = ?LOWEST_SEQ
+ batch = #batch{}
}.
Modified: couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl?rev=1144419&r1=1144418&r2=1144419&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl (original)
+++ couchdb/trunk/src/couchdb/couch_replicator_rev_finder.erl Fri Jul 8 18:38:44 2011
@@ -35,23 +35,24 @@ missing_revs_finder_loop(Cp, Target, Cha
closed ->
ok;
{ok, DocInfos} ->
- #doc_info{high_seq = ReportSeq} = lists:last(DocInfos),
+ {Ts, #doc_info{high_seq = Seq}} = lists:last(DocInfos),
+ ReportSeq = {Ts, Seq},
ok = gen_server:cast(Cp, {report_seq, ReportSeq}),
?LOG_DEBUG("Missing revs finder defined report seq ~p", [ReportSeq]),
IdRevs = [{Id, [Rev || #rev_info{rev = Rev} <- RevsInfo]} ||
- #doc_info{id = Id, revs = RevsInfo} <- DocInfos],
+ {_, #doc_info{id = Id, revs = RevsInfo}} <- DocInfos],
Target2 = open_db(Target),
{ok, Missing} = couch_api_wrap:get_missing_revs(Target2, IdRevs),
close_db(Target2),
- queue_missing_revs(Missing, DocInfos, RevsQueue),
+ queue_missing_revs(Missing, DocInfos, ReportSeq, RevsQueue),
missing_revs_finder_loop(Cp, Target2, ChangesQueue, RevsQueue, BatchSize)
end.
-queue_missing_revs(Missing, DocInfos, Queue) ->
+queue_missing_revs(Missing, DocInfos, ReportSeq, Queue) ->
IdRevsSeqDict = dict:from_list(
[{Id, {[Rev || #rev_info{rev = Rev} <- RevsInfo], Seq}} ||
- #doc_info{id = Id, revs = RevsInfo, high_seq = Seq} <- DocInfos]),
+ {_, #doc_info{id = Id, revs = RevsInfo, high_seq = Seq}} <- DocInfos]),
AllDict = lists:foldl(
fun({Id, MissingRevs, PAs}, Acc) ->
{_, Seq} = dict:fetch(Id, IdRevsSeqDict),
@@ -71,7 +72,7 @@ queue_missing_revs(Missing, DocInfos, Qu
AllDict, non_missing(IdRevsSeqDict, Missing)),
?LOG_DEBUG("Missing revs finder adding batch of ~p IdRevs to work queue",
[dict:size(AllDict2)]),
- ok = couch_work_queue:queue(Queue, dict:to_list(AllDict2)).
+ ok = couch_work_queue:queue(Queue, {ReportSeq, dict:to_list(AllDict2)}).
non_missing(NonMissingDict, []) ->