You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/06 20:18:01 UTC
[11/27] couch-replicator commit: updated refs/heads/windsor-merge to
75e5ba1
Extract code that queues items into its own fun
BugzID: 24294
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/6f2dc4cf
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/6f2dc4cf
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/6f2dc4cf
Branch: refs/heads/windsor-merge
Commit: 6f2dc4cf8d5e14e8c416c6d1be4b65e82ebb029a
Parents: 7e28640
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri Oct 18 11:50:14 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Tue Jul 29 14:57:50 2014 +0100
----------------------------------------------------------------------
src/couch_replicator_changes_reader.erl | 70 ++++++++++++++--------------
1 file changed, 36 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/6f2dc4cf/src/couch_replicator_changes_reader.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_changes_reader.erl b/src/couch_replicator_changes_reader.erl
index 390a87e..045e074 100644
--- a/src/couch_replicator_changes_reader.erl
+++ b/src/couch_replicator_changes_reader.erl
@@ -31,7 +31,8 @@ start_link(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
spawn_link(fun() ->
put(last_seq, StartSeq),
put(retries_left, Db#httpdb.retries),
- ?MODULE:read_changes(Parent, StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options, 1)
+ ?MODULE:read_changes(Parent, StartSeq, Db#httpdb{retries = 0},
+ ChangesQueue, Options, 1)
end);
start_link(StartSeq, Db, ChangesQueue, Options) ->
Parent = self(),
@@ -40,41 +41,11 @@ start_link(StartSeq, Db, ChangesQueue, Options) ->
end).
read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
+ Continuous = couch_util:get_value(continuous, Options),
try
couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq,
- fun(#doc_info{high_seq = Seq, id = Id} = DocInfo) ->
- case Id of
- <<>> ->
- % Previous CouchDB releases had a bug which allowed a doc
- % with an empty ID to be inserted into databases. Such doc
- % is impossible to GET.
- couch_log:error("Replicator: ignoring document with empty ID in "
- "source database `~s` (_changes sequence ~p)",
- [couch_replicator_api_wrap:db_uri(Db), Seq]);
- _ ->
- ok = couch_work_queue:queue(ChangesQueue, DocInfo)
- end,
- put(last_seq, Seq);
- ({last_seq, LS}) ->
- case get_value(continuous, Options) of
- true ->
- % LS should never be undefined, but it doesn't hurt to be
- % defensive inside the replicator.
- Seq = case LS of undefined -> get(last_seq); _ -> LS end,
- OldSeq = get(last_seq),
- if Seq == OldSeq -> ok; true ->
- Msg = {report_seq_done, {Ts, Seq}, #rep_stats{}},
- ok = gen_server:call(Parent, Msg, infinity)
- end,
- put(last_seq, Seq),
- throw(recurse);
- _ ->
- % This clause is unreachable today, but let's plan ahead
- % for the future where we checkpoint against last_seq
- % instead of the sequence of the last change. The two can
- % differ substantially in the case of a restrictive filter.
- ok
- end
+ fun(Item) ->
+ process_change(Item, {Parent, Db, ChangesQueue, Continuous, Ts})
end, Options),
couch_work_queue:close(ChangesQueue)
catch
@@ -103,3 +74,34 @@ read_changes(Parent, StartSeq, Db, ChangesQueue, Options, Ts) ->
exit(Error)
end
end.
+
+
+process_change(#doc_info{id = <<>>} = DocInfo, {_, Db, _, _, _}) ->
+ % Previous CouchDB releases had a bug which allowed a doc with an empty ID
+ % to be inserted into databases. Such doc is impossible to GET.
+ couch_log:error("Replicator: ignoring document with empty ID in "
+ "source database `~s` (_changes sequence ~p)",
+ [couch_replicator_api_wrap:db_uri(Db), DocInfo#doc_info.high_seq]);
+
+process_change(#doc_info{} = DocInfo, {_, _, ChangesQueue, _, _}) ->
+ ok = couch_work_queue:queue(ChangesQueue, DocInfo),
+ put(last_seq, DocInfo#doc_info.high_seq);
+
+process_change({last_seq, LS}, {Parent, _, _, true = _Continuous, Ts}) ->
+ % LS should never be undefined, but it doesn't hurt to be defensive inside
+ % the replicator.
+ Seq = case LS of undefined -> get(last_seq); _ -> LS end,
+ OldSeq = get(last_seq),
+ if Seq == OldSeq -> ok; true ->
+ Msg = {report_seq_done, {Ts, Seq}, #rep_stats{}},
+ ok = gen_server:call(Parent, Msg, infinity)
+ end,
+ put(last_seq, Seq),
+ throw(recurse);
+
+process_change({last_seq, _}, _) ->
+ % This clause is unreachable today, but let's plan ahead for the future
+ % where we checkpoint against last_seq instead of the sequence of the last
+ % change. The two can differ substantially in the case of a restrictive
+ % filter.
+ ok.