You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by bb...@apache.org on 2014/10/31 20:53:10 UTC

[1/5] couch-replicator commit: updated refs/heads/master to c9184cf

Repository: couchdb-couch-replicator
Updated Branches:
  refs/heads/master f195535e7 -> c9184cff4


remove old rcouch code


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/c9184cff
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/c9184cff
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/c9184cff

Branch: refs/heads/master
Commit: c9184cff45c9bfe0f1bbe560f06d6cfea2c5cf0c
Parents: a85c7c4
Author: Benjamin Bastian <be...@gmail.com>
Authored: Thu Oct 30 13:19:46 2014 -0700
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Thu Oct 30 13:29:56 2014 -0700

----------------------------------------------------------------------
 src/couch_replicator.erl | 70 -------------------------------------------
 1 file changed, 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/c9184cff/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 3356609..e7d2ee0 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -646,61 +646,6 @@ fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
     end.
 
 
-spawn_changes_reader(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
-    spawn_link(fun() ->
-        put(last_seq, StartSeq),
-        put(retries_left, Db#httpdb.retries),
-        read_changes(StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options)
-    end);
-spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) ->
-    spawn_link(fun() ->
-        read_changes(StartSeq, Db, ChangesQueue, Options)
-    end).
-
-read_changes(StartSeq, Db, ChangesQueue, Options) ->
-    try
-        couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq, fun
-            (#doc_info{high_seq = Seq, revs = []}) ->
-                put(last_seq, Seq);
-            (#doc_info{high_seq = Seq, id = Id} = DocInfo) ->
-                case Id of
-                <<>> ->
-                    % Previous CouchDB releases had a bug which allowed a doc
-                    % with an empty ID to be inserted into databases. Such doc
-                    % is impossible to GET.
-                    ?LOG_ERROR("Replicator: ignoring document with empty ID in "
-                        "source database `~s` (_changes sequence ~p)",
-                        [couch_replicator_api_wrap:db_uri(Db), Seq]);
-                _ ->
-                    ok = couch_work_queue:queue(ChangesQueue, DocInfo)
-                end,
-                put(last_seq, Seq)
-            end, Options),
-        couch_work_queue:close(ChangesQueue)
-    catch exit:{http_request_failed, _, _, _} = Error ->
-        case get(retries_left) of
-        N when N > 0 ->
-            put(retries_left, N - 1),
-            LastSeq = get(last_seq),
-            Db2 = case LastSeq of
-            StartSeq ->
-                ?LOG_INFO("Retrying _changes request to source database ~s"
-                    " with since=~p in ~p seconds",
-                    [couch_replicator_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
-                ok = timer:sleep(Db#httpdb.wait),
-                Db#httpdb{wait = 2 * Db#httpdb.wait};
-            _ ->
-                ?LOG_INFO("Retrying _changes request to source database ~s"
-                    " with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]),
-                Db
-            end,
-            read_changes(LastSeq, Db2, ChangesQueue, Options);
-        _ ->
-            exit(Error)
-        end
-    end.
-
-
 spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->
     spawn_link(fun() ->
         changes_manager_loop_open(Parent, ChangesQueue, BatchSize, 1)
@@ -953,21 +898,6 @@ db_monitor(#db{} = Db) ->
 db_monitor(_HttpDb) ->
     nil.
 
-source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq,
-                          type = view, view = {DDoc, VName}}) ->
-    case (catch couch_replicator_api_wrap:get_view_info(
-                Db#httpdb{retries = 3}, DDoc, VName)) of
-    {ok, Info} ->
-        get_value(<<"update_seq">>, Info, Seq);
-    _ ->
-        Seq
-    end;
-
-source_cur_seq(#rep_state{source = Db, source_seq = Seq,
-                          type = view, view = {DDoc, VName}}) ->
-    {ok, Info} = couch_replicator_api_wrap:get_view_info(Db, DDoc, VName),
-    get_value(<<"update_seq">>, Info, Seq);
-
 get_pending_count(St) ->
     Rep = St#rep_state.rep_details,
     Timeout = get_value(connection_timeout, Rep#rep.options),


[3/5] couch-replicator commit: updated refs/heads/master to c9184cf

Posted by bb...@apache.org.
Update handle_changes function names


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/59a2f2bd
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/59a2f2bd
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/59a2f2bd

Branch: refs/heads/master
Commit: 59a2f2bd6c5693a81f42db21b7407cf185b2be5d
Parents: f195535
Author: Benjamin Bastian <be...@gmail.com>
Authored: Fri Aug 29 17:25:06 2014 +0700
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Thu Oct 30 13:29:56 2014 -0700

----------------------------------------------------------------------
 src/couch_replicator_api_wrap.erl | 2 +-
 src/couch_replicator_manager.erl  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/59a2f2bd/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index d739c8c..89c7eb7 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -502,7 +502,7 @@ changes_since(Db, Style, StartSeq, UserFun, Options) ->
     },
     QueryParams = get_value(query_params, Options, {[]}),
     Req = changes_json_req(Db, Filter, QueryParams, Options),
-    ChangesFeedFun = couch_changes:handle_changes(Args, {json_req, Req}, Db),
+    ChangesFeedFun = couch_changes:handle_db_changes(Args, {json_req, Req}, Db),
     ChangesFeedFun(fun({change, Change, _}, _) ->
             UserFun(json_to_doc_info(Change));
         (_, _) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/59a2f2bd/src/couch_replicator_manager.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl
index 19406e2..d6c88c1 100644
--- a/src/couch_replicator_manager.erl
+++ b/src/couch_replicator_manager.erl
@@ -296,7 +296,7 @@ changes_reader(Server, DbName, Since) ->
     UserCtx = #user_ctx{roles = [<<"_admin">>, <<"_replicator">>]},
     DbOpenOptions = [{user_ctx, UserCtx}, sys_db],
     {ok, Db} = couch_db:open_int(DbName, DbOpenOptions),
-    ChangesFeedFun = couch_changes:handle_changes(
+    ChangesFeedFun = couch_changes:handle_db_changes(
         #changes_args{
             include_docs = true,
             since = Since,


[4/5] couch-replicator commit: updated refs/heads/master to c9184cf

Posted by bb...@apache.org.
fix view changes with multiple keys

Conflicts:
	rebar.config
	src/couch_replicator.erl


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/a85c7c4a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/a85c7c4a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/a85c7c4a

Branch: refs/heads/master
Commit: a85c7c4aea154ccc52a7f36a3c0118c0b8b1d07b
Parents: 7390c7e
Author: benoitc <bc...@gmail.com>
Authored: Tue Oct 14 14:15:47 2014 +0200
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Thu Oct 30 13:29:56 2014 -0700

----------------------------------------------------------------------
 src/couch_replicator.erl | 25 ++++++++++++-------------
 1 file changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/a85c7c4a/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index 6992fff..3356609 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -488,18 +488,6 @@ terminate(shutdown, #rep_state{rep_details = #rep{id = RepId}} = State) ->
     couch_replicator_notifier:notify({error, RepId, <<"cancelled">>}),
     terminate_cleanup(State);
 
-terminate(Reason, #rep_state{} = State) ->
-    #rep_state{
-        source_name = Source,
-        target_name = Target,
-        rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
-    } = State,
-    couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
-        [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
-    terminate_cleanup(State),
-    couch_replicator_notifier:notify({error, RepId, Reason}),
-    couch_replicator_manager:replication_error(Rep, Reason);
-
 terminate(shutdown, {error, Class, Error, Stack, InitArgs}) ->
     #rep{id=RepId} = InitArgs,
     couch_stats:increment_counter([couch_replicator, failed_starts]),
@@ -514,7 +502,18 @@ terminate(shutdown, {error, Class, Error, Stack, InitArgs}) ->
         NotifyError = Error
     end,
     couch_replicator_notifier:notify({error, RepId, NotifyError}),
-    couch_replicator_manager:replication_error(InitArgs, NotifyError).
+    couch_replicator_manager:replication_error(InitArgs, NotifyError);
+terminate(Reason, State) ->
+    #rep_state{
+        source_name = Source,
+        target_name = Target,
+        rep_details = #rep{id = {BaseId, Ext} = RepId} = Rep
+    } = State,
+    couch_log:error("Replication `~s` (`~s` -> `~s`) failed: ~s",
+        [BaseId ++ Ext, Source, Target, to_binary(Reason)]),
+    terminate_cleanup(State),
+    couch_replicator_notifier:notify({error, RepId, Reason}),
+    couch_replicator_manager:replication_error(Rep, Reason).
 
 terminate_cleanup(State) ->
     update_task(State),


[5/5] couch-replicator commit: updated refs/heads/master to c9184cf

Posted by bb...@apache.org.
couch_replicator: add replication using changes in a view

Instead of a database, the replicator can now filter the documents using
a view index. All documents having a key emitted in the view can be
replicated.

View parameters can be used. Which means that you can replicate results
corresponding to a key in a view or a range.

Conflicts:
	src/couch_replicator.erl
	src/couch_replicator_api_wrap.erl


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/3e3e2cb6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/3e3e2cb6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/3e3e2cb6

Branch: refs/heads/master
Commit: 3e3e2cb640a9babf90e6e2ba8af95c2c1cde6d74
Parents: 59a2f2b
Author: benoitc <be...@apache.org>
Authored: Sun Feb 9 00:43:23 2014 +0100
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Thu Oct 30 13:29:56 2014 -0700

----------------------------------------------------------------------
 src/couch_replicator.erl          | 38 +++++++++++++++++++++++++++++++---
 src/couch_replicator.hrl          |  2 ++
 src/couch_replicator_api_wrap.erl | 21 +++++++++++++++++++
 src/couch_replicator_utils.erl    | 27 ++++++++++++++++++++++--
 4 files changed, 83 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/3e3e2cb6/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index ee45d0b..f7e9d27 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -73,7 +73,9 @@
     target_monitor = nil,
     source_seq = nil,
     use_checkpoints = true,
-    checkpoint_interval = 5000
+    checkpoint_interval = 5000,
+    type = db,
+    view = nil
 }).
 
 
@@ -558,7 +560,8 @@ cancel_timer(#rep_state{timer = Timer} = State) ->
 init_state(Rep) ->
     #rep{
         source = Src, target = Tgt,
-        options = Options, user_ctx = UserCtx
+        options = Options, user_ctx = UserCtx,
+        type = Type, view = View
     } = Rep,
     {ok, Source} = couch_replicator_api_wrap:db_open(Src, [{user_ctx, UserCtx}]),
     {ok, Target} = couch_replicator_api_wrap:db_open(Tgt, [{user_ctx, UserCtx}],
@@ -572,6 +575,17 @@ init_state(Rep) ->
     {StartSeq0, History} = compare_replication_logs(SourceLog, TargetLog),
     StartSeq1 = get_value(since_seq, Options, StartSeq0),
     StartSeq = {0, StartSeq1},
+
+    SourceSeq = case Type of
+        db -> get_value(<<"update_seq">>, SourceInfo, ?LOWEST_SEQ);
+        view ->
+            {DDoc, VName} = View,
+            {ok, VInfo} = couch_replicator_api_wrap:get_view_info(Source, DDoc,
+                                                                  VName),
+            get_value(<<"update_seq">>, VInfo, ?LOWEST_SEQ)
+    end,
+
+
     #doc{body={CheckpointHistory}} = SourceLog,
     State = #rep_state{
         rep_details = Rep,
@@ -596,8 +610,12 @@ init_state(Rep) ->
             start_db_compaction_notifier(Target, self()),
         source_monitor = db_monitor(Source),
         target_monitor = db_monitor(Target),
+        source_seq = SourceSeq,
         use_checkpoints = get_value(use_checkpoints, Options, true),
-        checkpoint_interval = get_value(checkpoint_interval, Options, 5000)
+        checkpoint_interval = get_value(checkpoint_interval, Options,
+                                        5000),
+        type = Type,
+        view = View
     },
     State#rep_state{timer = start_timer(State)}.
 
@@ -883,6 +901,20 @@ db_monitor(#db{} = Db) ->
 db_monitor(_HttpDb) ->
     nil.
 
+source_cur_seq(#rep_state{source = #httpdb{} = Db, source_seq = Seq,
+                          type = view, view = {DDoc, VName}}) ->
+    case (catch couch_replicator_api_wrap:get_view_info(
+                Db#httpdb{retries = 3}, DDoc, VName)) of
+    {ok, Info} ->
+        get_value(<<"update_seq">>, Info, Seq);
+    _ ->
+        Seq
+    end;
+
+source_cur_seq(#rep_state{source = Db, source_seq = Seq,
+                          type = view, view = {DDoc, VName}}) ->
+    {ok, Info} = couch_replicator_api_wrap:get_view_info(Db, DDoc, VName),
+    get_value(<<"update_seq">>, Info, Seq);
 
 get_pending_count(St) ->
     Rep = St#rep_state.rep_details,

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/3e3e2cb6/src/couch_replicator.hrl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.hrl b/src/couch_replicator.hrl
index 893d9e4..dbb1793 100644
--- a/src/couch_replicator.hrl
+++ b/src/couch_replicator.hrl
@@ -18,5 +18,7 @@
     target,
     options,
     user_ctx,
+    type = db,
+    view = nil,
     doc_id
 }).

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/3e3e2cb6/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index 89c7eb7..97e9b25 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -19,6 +19,7 @@
 % Many options and apis aren't yet supported here, they are added as needed.
 
 -include_lib("couch/include/couch_db.hrl").
+-include_lib("couch_mrview/include/couch_mrview.hrl").
 -include("couch_replicator_api_wrap.hrl").
 
 -export([
@@ -27,6 +28,7 @@
     db_close/1,
     get_db_info/1,
     get_pending_count/2,
+    get_view_info/3,
     update_doc/3,
     update_doc/4,
     update_docs/3,
@@ -164,6 +166,15 @@ get_pending_count(#db{name=DbName}=Db, Seq) when is_number(Seq) ->
     couch_db:close(CountDb),
     {ok, Pending}.
 
+get_view_info(#httpdb{} = Db, DDocId, ViewName) ->
+    Path = iolist_to_binary([DDocId, "/_view/", ViewName, "/_info"]),
+    send_req(Db, [{path, Path}],
+        fun(200, _, {Props}) ->
+            {ok, Props}
+        end);
+get_view_info(#db{name = DbName}, DDocId, ViewName) ->
+    couch_mrview:get_view_info(DbName, DDocId, ViewName).
+
 
 ensure_full_commit(#httpdb{} = Db) ->
     send_req(
@@ -517,6 +528,10 @@ maybe_add_changes_filter_q_args(BaseQS, Options) ->
     undefined ->
         BaseQS;
     FilterName ->
+        %% get list of view attributes
+        ViewFields0 = [atom_to_list(F) || F <- record_info(fields,  mrargs)],
+        ViewFields = ["key" | ViewFields0],
+
         {Params} = get_value(query_params, Options, {[]}),
         [{"filter", ?b2l(FilterName)} | lists:foldl(
             fun({K, V}, QSAcc) ->
@@ -524,6 +539,12 @@ maybe_add_changes_filter_q_args(BaseQS, Options) ->
                 case lists:keymember(Ks, 1, QSAcc) of
                 true ->
                     QSAcc;
+                false when FilterName =:= <<"_view">> ->
+                    V1 = case lists:member(Ks, ViewFields) of
+                        true -> ?JSON_ENCODE(V);
+                        false -> couch_util:to_list(V)
+                    end,
+                    [{Ks, V1} | QSAcc];
                 false ->
                     [{Ks, couch_util:to_list(V)} | QSAcc]
                 end

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/3e3e2cb6/src/couch_replicator_utils.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_utils.erl b/src/couch_replicator_utils.erl
index 9d716c9..30afb39 100644
--- a/src/couch_replicator_utils.erl
+++ b/src/couch_replicator_utils.erl
@@ -40,13 +40,34 @@ parse_rep_doc({Props}, UserCtx) ->
     true ->
         {ok, #rep{options = Options, user_ctx = UserCtx}};
     false ->
-        Source = parse_rep_db(get_value(<<"source">>, Props), ProxyParams, Options),
-        Target = parse_rep_db(get_value(<<"target">>, Props), ProxyParams, Options),
+        Source = parse_rep_db(get_value(<<"source">>, Props),
+                              ProxyParams, Options),
+        Target = parse_rep_db(get_value(<<"target">>, Props),
+                              ProxyParams, Options),
+
+
+        {RepType, View} = case get_value(<<"filter">>, Props) of
+                <<"_view">> ->
+                    {QP}  = get_value(query_params, Options, {[]}),
+                    ViewParam = get_value(<<"view">>, QP),
+                    View1 = case re:split(ViewParam, <<"/">>) of
+                        [DName, ViewName] ->
+                            {<< "_design/", DName/binary >>, ViewName};
+                        _ ->
+                            throw({bad_request, "Invalid `view` parameter."})
+                    end,
+                    {view, View1};
+                _ ->
+                    {db, nil}
+            end,
+
         Rep = #rep{
             source = Source,
             target = Target,
             options = Options,
             user_ctx = UserCtx,
+            type = RepType,
+            view = View,
             doc_id = get_value(<<"_id">>, Props, null)
         },
         {ok, Rep#rep{id = replication_id(Rep)}}
@@ -103,6 +124,8 @@ maybe_append_filters(Base,
             DocIds ->
                 [DocIds]
             end;
+        <<"_", _/binary>> = Filter ->
+                [Filter, get_value(query_params, Options, {[]})];
         Filter ->
             [filter_code(Filter, Source, UserCtx),
                 get_value(query_params, Options, {[]})]


[2/5] couch-replicator commit: updated refs/heads/master to c9184cf

Posted by bb...@apache.org.
don't replicate removed document in a view changes

Conflicts:
	src/couch_replicator.erl
	src/couch_replicator_api_wrap.erl


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/commit/7390c7ec
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/tree/7390c7ec
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/diff/7390c7ec

Branch: refs/heads/master
Commit: 7390c7eccd19552ab842bc6b0a3c31fbbfc724c4
Parents: 3e3e2cb
Author: benoitc <bc...@gmail.com>
Authored: Thu Jul 3 13:08:34 2014 +0200
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Thu Oct 30 13:29:56 2014 -0700

----------------------------------------------------------------------
 src/couch_replicator.erl          | 53 ++++++++++++++++++++++++++++++++++
 src/couch_replicator_api_wrap.erl | 12 ++++++--
 src/couch_replicator_worker.erl   | 15 ++++++----
 3 files changed, 72 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/7390c7ec/src/couch_replicator.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator.erl b/src/couch_replicator.erl
index f7e9d27..6992fff 100644
--- a/src/couch_replicator.erl
+++ b/src/couch_replicator.erl
@@ -647,6 +647,59 @@ fold_replication_logs([Db | Rest] = Dbs, Vsn, LogId, NewId, Rep, Acc) ->
     end.
 
 
+spawn_changes_reader(StartSeq, #httpdb{} = Db, ChangesQueue, Options) ->
+    spawn_link(fun() ->
+        put(last_seq, StartSeq),
+        put(retries_left, Db#httpdb.retries),
+        read_changes(StartSeq, Db#httpdb{retries = 0}, ChangesQueue, Options)
+    end);
+spawn_changes_reader(StartSeq, Db, ChangesQueue, Options) ->
+    spawn_link(fun() ->
+        read_changes(StartSeq, Db, ChangesQueue, Options)
+    end).
+
+read_changes(StartSeq, Db, ChangesQueue, Options) ->
+    try
+        couch_replicator_api_wrap:changes_since(Db, all_docs, StartSeq, fun
+            (#doc_info{high_seq = Seq, revs = []}) ->
+                put(last_seq, Seq);
+            (#doc_info{high_seq = Seq, id = Id} = DocInfo) ->
+                case Id of
+                <<>> ->
+                    % Previous CouchDB releases had a bug which allowed a doc
+                    % with an empty ID to be inserted into databases. Such doc
+                    % is impossible to GET.
+                    ?LOG_ERROR("Replicator: ignoring document with empty ID in "
+                        "source database `~s` (_changes sequence ~p)",
+                        [couch_replicator_api_wrap:db_uri(Db), Seq]);
+                _ ->
+                    ok = couch_work_queue:queue(ChangesQueue, DocInfo)
+                end,
+                put(last_seq, Seq)
+            end, Options),
+        couch_work_queue:close(ChangesQueue)
+    catch exit:{http_request_failed, _, _, _} = Error ->
+        case get(retries_left) of
+        N when N > 0 ->
+            put(retries_left, N - 1),
+            LastSeq = get(last_seq),
+            Db2 = case LastSeq of
+            StartSeq ->
+                ?LOG_INFO("Retrying _changes request to source database ~s"
+                    " with since=~p in ~p seconds",
+                    [couch_replicator_api_wrap:db_uri(Db), LastSeq, Db#httpdb.wait / 1000]),
+                ok = timer:sleep(Db#httpdb.wait),
+                Db#httpdb{wait = 2 * Db#httpdb.wait};
+            _ ->
+                ?LOG_INFO("Retrying _changes request to source database ~s"
+                    " with since=~p", [couch_replicator_api_wrap:db_uri(Db), LastSeq]),
+                Db
+            end,
+            read_changes(LastSeq, Db2, ChangesQueue, Options);
+        _ ->
+            exit(Error)
+        end
+    end.
 
 
 spawn_changes_manager(Parent, ChangesQueue, BatchSize) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/7390c7ec/src/couch_replicator_api_wrap.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_api_wrap.erl b/src/couch_replicator_api_wrap.erl
index 97e9b25..c35659d 100644
--- a/src/couch_replicator_api_wrap.erl
+++ b/src/couch_replicator_api_wrap.erl
@@ -839,12 +839,21 @@ json_to_doc_info({Props}) ->
     undefined ->
         {last_seq, get_value(<<"last_seq">>, Props)};
     Changes ->
-        RevsInfo = lists:map(
+        RevsInfo0 = lists:map(
             fun({Change}) ->
                 Rev = couch_doc:parse_rev(get_value(<<"rev">>, Change)),
                 Del = couch_replicator_utils:is_deleted(Change),
                 #rev_info{rev=Rev, deleted=Del}
             end, Changes),
+
+        RevsInfo = case get_value(<<"removed">>, Props) of
+            true ->
+                [_ | RevsInfo1] = RevsInfo0,
+                RevsInfo1;
+            _ ->
+                RevsInfo0
+        end,
+
         #doc_info{
             id = get_value(<<"id">>, Props),
             high_seq = get_value(<<"seq">>, Props),
@@ -852,7 +861,6 @@ json_to_doc_info({Props}) ->
         }
     end.
 
-
 bulk_results_to_errors(Docs, {ok, Results}, interactive_edit) ->
     lists:reverse(lists:foldl(
         fun({_, {ok, _}}, Acc) ->

http://git-wip-us.apache.org/repos/asf/couchdb-couch-replicator/blob/7390c7ec/src/couch_replicator_worker.erl
----------------------------------------------------------------------
diff --git a/src/couch_replicator_worker.erl b/src/couch_replicator_worker.erl
index 7a6cc96..155e11d 100644
--- a/src/couch_replicator_worker.erl
+++ b/src/couch_replicator_worker.erl
@@ -491,12 +491,15 @@ flush_doc(Target, #doc{id = Id, revs = {Pos, [RevId | _]}} = Doc) ->
 
 
 find_missing(DocInfos, Target) ->
-    {IdRevs, AllRevsCount} = lists:foldr(
-        fun(#doc_info{id = Id, revs = RevsInfo}, {IdRevAcc, CountAcc}) ->
-            Revs = [Rev || #rev_info{rev = Rev} <- RevsInfo],
-            {[{Id, Revs} | IdRevAcc], CountAcc + length(Revs)}
-        end,
-        {[], 0}, DocInfos),
+    {IdRevs, AllRevsCount} = lists:foldr(fun
+                (#doc_info{revs = []}, {IdRevAcc, CountAcc}) ->
+                    {IdRevAcc, CountAcc};
+                (#doc_info{id = Id, revs = RevsInfo}, {IdRevAcc, CountAcc}) ->
+                    Revs = [Rev || #rev_info{rev = Rev} <- RevsInfo],
+                    {[{Id, Revs} | IdRevAcc], CountAcc + length(Revs)}
+            end, {[], 0}, DocInfos),
+
+
     {ok, Missing} = couch_replicator_api_wrap:get_missing_revs(Target, IdRevs),
     MissingRevsCount = lists:foldl(
         fun({_Id, MissingRevs, _PAs}, Acc) -> Acc + length(MissingRevs) end,