You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/28 14:20:41 UTC
[06/50] fabric commit: updated refs/heads/master to a71701c
Provide replacement shards in changes feeds
BugzId: 20423
Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/cb388f29
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/cb388f29
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/cb388f29
Branch: refs/heads/master
Commit: cb388f29efcd1597f5c7ad35a361055f4bb91651
Parents: 18f6c8e
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Sep 12 12:25:33 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100
----------------------------------------------------------------------
src/fabric_view_changes.erl | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/cb388f29/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 69d9d09..19824bc 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -131,14 +131,22 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
end
end, unpack_seqs(PackedSeqs, DbName)),
{Workers0, _} = lists:unzip(Seqs),
+ Repls = fabric_view:get_shard_replacements(DbName, Workers0),
+ StartFun = fun(#shard{name=Name, node=N}=Shard) ->
+ Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, 0]}),
+ Shard#shard{ref = Ref}
+ end,
RexiMon = fabric_util:create_monitors(Workers0),
try
- case fabric_util:stream_start(Workers0, #shard.ref) of
+ case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
{ok, Workers} ->
try
- LiveSeqs = lists:filter(fun({W, _S}) ->
- lists:member(W, Workers)
- end, Seqs),
+ LiveSeqs = lists:map(fun(W) ->
+ case lists:keyfind(W, 1, Seqs) of
+ {W, Seq} -> {W, Seq};
+ _ -> {W, 0}
+ end
+ end, Workers),
send_changes(DbName, Workers, LiveSeqs, ChangesArgs,
Callback, AccIn, Timeout)
after