You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:20:41 UTC

[06/50] fabric commit: updated refs/heads/master to a71701c

Provide replacement shards in changes feeds

BugzId: 20423


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/cb388f29
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/cb388f29
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/cb388f29

Branch: refs/heads/master
Commit: cb388f29efcd1597f5c7ad35a361055f4bb91651
Parents: 18f6c8e
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Sep 12 12:25:33 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/cb388f29/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 69d9d09..19824bc 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -131,14 +131,22 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
         end
     end, unpack_seqs(PackedSeqs, DbName)),
     {Workers0, _} = lists:unzip(Seqs),
+    Repls = fabric_view:get_shard_replacements(DbName, Workers0),
+    StartFun = fun(#shard{name=Name, node=N}=Shard) ->
+        Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, 0]}),
+        Shard#shard{ref = Ref}
+    end,
     RexiMon = fabric_util:create_monitors(Workers0),
     try
-        case fabric_util:stream_start(Workers0, #shard.ref) of
+        case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
             {ok, Workers} ->
                 try
-                    LiveSeqs = lists:filter(fun({W, _S}) ->
-                        lists:member(W, Workers)
-                    end, Seqs),
+                    LiveSeqs = lists:map(fun(W) ->
+                        case lists:keyfind(W, 1, Seqs) of
+                            {W, Seq} -> {W, Seq};
+                            _ -> {W, 0}
+                        end
+                    end, Workers),
                     send_changes(DbName, Workers, LiveSeqs, ChangesArgs,
                             Callback, AccIn, Timeout)
                 after