You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 11:11:49 UTC

[01/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Repository: couchdb-fabric
Updated Branches:
  refs/heads/windsor-merge-121 [created] 79e6e2fd1


Use new couch_db APIs for uuids and epochs

This uses the new API to get the uuid and epochs for a database.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/ae1c64a2
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/ae1c64a2
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/ae1c64a2

Branch: refs/heads/windsor-merge-121
Commit: ae1c64a2d947889bfb82e7cca17dd58f6e3ffec1
Parents: a23d2af
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Jun 13 16:55:25 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 10:48:55 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 46 +++++++++++++++++++++++++++-------------------
 1 file changed, 27 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/ae1c64a2/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index ef0e737..870ac57 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -41,7 +41,7 @@ changes(DbName, Options, StartVector) ->
         try
             {ok, {_, LastSeq, _, _}} =
                 couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0),
-            rexi:reply({complete, {LastSeq, Db#db.uuid, node()}})
+            rexi:reply({complete, {LastSeq, couch_db:get_uuid(Db), node()}})
         after
             couch_db:close(Db)
         end;
@@ -279,11 +279,14 @@ changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
 
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) ->
     Doc = doc_member(Db, DI, Opts),
-    #change{key={Seq, Db#db.uuid, node()}, id=Id, value=Results, doc=Doc, deleted=Del};
+    Uuid = couch_db:get_uuid(Db),
+    #change{key={Seq, Uuid, node()}, id=Id, value=Results, doc=Doc, deleted=Del};
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
-    #change{key={Seq, Db#db.uuid, node()}, id=Id, value=Results, deleted=true};
+    Uuid = couch_db:get_uuid(Db),
+    #change{key={Seq, Uuid, node()}, id=Id, value=Results, deleted=true};
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
-    #change{key={Seq, Db#db.uuid, node()}, id=Id, value=Results}.
+    Uuid = couch_db:get_uuid(Db),
+    #change{key={Seq, Uuid, node()}, id=Id, value=Results}.
 
 doc_member(Shard, DocInfo, Opts) ->
     case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of
@@ -365,16 +368,20 @@ set_io_priority(DbName, Options) ->
             ok
     end.
 
-calculate_start_seq(#db{uuid=Uuid1}, {_, Uuid2, _}) when Uuid1 =/= Uuid2 ->
-    %% The file was rebuilt, most likely in a different order, so rewind.
-    0;
-calculate_start_seq(#db{}=Db, {Seq, _Uuid, Node}) ->
-    case owner(Node, Seq, Db#db.epochs) of
-        true  -> Seq;
-        false -> 0
-    end;
 calculate_start_seq(_Db, Seq) when is_integer(Seq) ->
-    Seq.
+    Seq;
+calculate_start_seq(Db, {Seq, Uuid, Node}) ->
+    case couch_db:get_uuid(Db) == Uuid of
+        true ->
+            case owner(Node, Seq, couch_db:get_epochs(Db)) of
+                true -> Seq;
+                false -> 0
+            end;
+        false ->
+            %% The file was rebuilt, most likely in a different
+            %% order, so rewind.
+            0
+    end.
 
 owner(Node, Seq, Epochs) ->
     owner(Node, Seq, Epochs, infinity).
@@ -392,14 +399,15 @@ owner(Node, Seq, [{_EpochNode, EpochSeq} | Rest], _HighSeq) ->
 
 calculate_start_seq_test() ->
     %% uuid mismatch is always a rewind.
-    ?assertEqual(0, calculate_start_seq(#db{uuid=uuid1}, {1, uuid2, node1})),
+    Hdr1 = couch_db_header:new(),
+    Hdr2 = couch_db_header:set(Hdr1, [{uuid, uuid1}]),
+    ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, {1, uuid2, node1})),
     %% uuid matches and seq is owned by node.
-    ?assertEqual(2, calculate_start_seq(#db{uuid=uuid1, epochs=[{node1, 1}]},
-                                        {2, uuid1, node1})),
+    Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]),
+    ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, {2, uuid1, node1})),
     %% uuids match but seq is not owned by node.
-    ?assertEqual(0, calculate_start_seq(#db{uuid=uuid1,
-                                            epochs=[{node2, 2}, {node1, 1}]},
-                                        {3, uuid1, node1})),
+    Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]),
+    ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, {3, uuid1, node1})),
     %% return integer if we didn't get a vector.
     ?assertEqual(4, calculate_start_seq(#db{}, 4)).
 


[49/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Switch from ddoc to id+rev protocol for views


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/df0cd0b7
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/df0cd0b7
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/df0cd0b7

Branch: refs/heads/windsor-merge-121
Commit: df0cd0b7d43ce027ce62f79b7a71de99da356761
Parents: c5fbac5
Author: Brian Mitchell <br...@p2p.io>
Authored: Fri Oct 25 11:44:00 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 20:13:29 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_map.erl    | 2 +-
 src/fabric_view_reduce.erl | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/df0cd0b7/src/fabric_view_map.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index d2c010a..1201daf 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -26,7 +26,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
 go(DbName, DDoc, View, Args, Callback, Acc) ->
     Shards = fabric_view:get_shards(DbName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
-    RPCArgs = [DDoc, View, Args],
+    RPCArgs = [fabric_util:doc_id_and_rev(DDoc), View, Args],
     StartFun = fun(Shard) ->
         hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
     end,

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/df0cd0b7/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 67c36a3..2e0d1f2 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -25,7 +25,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -
 
 go(DbName, DDoc, VName, Args, Callback, Acc, {red, {_, Lang, _}, _}=VInfo) ->
     RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
-    RPCArgs = [DDoc, VName, Args],
+    RPCArgs = [fabric_util:doc_id_and_rev(DDoc), VName, Args],
     Shards = fabric_view:get_shards(DbName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
     StartFun = fun(Shard) ->


[26/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Add fabric_dict:is_key/2

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/44185ed3
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/44185ed3
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/44185ed3

Branch: refs/heads/windsor-merge-121
Commit: 44185ed3750fd64418ea64ca3c4cd6afc787e49e
Parents: beaf739
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Sep 4 15:58:42 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 13:44:03 2014 +0100

----------------------------------------------------------------------
 src/fabric_dict.erl | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/44185ed3/src/fabric_dict.erl
----------------------------------------------------------------------
diff --git a/src/fabric_dict.erl b/src/fabric_dict.erl
index a9d7fea..20700c0 100644
--- a/src/fabric_dict.erl
+++ b/src/fabric_dict.erl
@@ -19,6 +19,8 @@
 init(Keys, InitialValue) ->
     orddict:from_list([{Key, InitialValue} || Key <- Keys]).
 
+is_key(Key, Dict) ->
+    orddict:is_key(Key, Dict).
 
 decrement_all(Dict) ->
     [{K,V-1} || {K,V} <- Dict].


[28/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Implement fabric_util:stream_start/2

This is a utility function that handles gathering the start of a rexi
stream. It will return a single full hash ring of shards or an error.

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/f0c675e9
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/f0c675e9
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/f0c675e9

Branch: refs/heads/windsor-merge-121
Commit: f0c675e993b3d138eaff4d5cd8c2c3a9e7318564
Parents: 03086f3
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Sep 6 07:26:32 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 13:55:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_util.erl | 52 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 52 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/f0c675e9/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index c93efda..f11abe3 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -16,6 +16,7 @@
         update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
         remove_down_workers/2]).
 -export([request_timeout/0]).
+-export([stream_start/2]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -44,6 +45,57 @@ submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
 cleanup(Workers) ->
     [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
 
+stream_start(Workers0, Keypos) ->
+    Fun = fun handle_stream_start/3,
+    Acc = fabric_dict:init(Workers0, waiting),
+    Timeout = request_timeout(),
+    case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
+        {ok, Workers} ->
+            true = fabric_view:is_progress_possible(Workers),
+            AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
+                rexi:stream_start(From),
+                [Worker | WorkerAcc]
+            end, [], Workers),
+            {ok, AckedWorkers};
+        Else ->
+            Else
+    end.
+
+handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+    case fabric_util:remove_down_workers(State, NodeRef) of
+    {ok, NewState} ->
+        {ok, NewState};
+    error ->
+        Reason = {nodedown, <<"progress not possible">>},
+        {error, Reason}
+    end;
+handle_stream_start({rexi_EXIT, Reason}, Worker, State) ->
+    NewState = fabric_dict:erase(Worker, State),
+    case fabric_view:is_progress_possible(NewState) of
+    true ->
+        {ok, NewState};
+    false ->
+        {error, fabric_util:error_info(Reason)}
+    end;
+handle_stream_start(rexi_STREAM_INIT, {Worker, From}, State) ->
+    case fabric_dict:lookup_element(Worker, State) of
+    undefined ->
+        % This worker lost the race with other partition copies, terminate
+        rexi:stream_cancel(From),
+        {ok, State};
+    waiting ->
+        % Don't ack the worker yet so they don't start sending us
+        % rows until we're ready
+        NewState0 = fabric_dict:store(Worker, From, State),
+        NewState1 = fabric_view:remove_overlapping_shards(Worker, NewState0),
+        case fabric_dict:any(waiting, NewState1) of
+            true -> {ok, NewState1};
+            false -> {stop, NewState1}
+        end
+    end;
+handle_stream_start(Else, _, _) ->
+    exit({invalid_stream_start, Else}).
+
 recv(Workers, Keypos, Fun, Acc0) ->
     rexi_utils:recv(Workers, Keypos, Fun, Acc0, request_timeout(), infinity).
 


[25/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Use the dollar_endonly option to prevent newlines

BugzID: 24203


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/beaf7390
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/beaf7390
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/beaf7390

Branch: refs/heads/windsor-merge-121
Commit: beaf7390fdf9c0d28d58637eca8d01aa090415ee
Parents: feb5694
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Wed Oct 16 13:56:35 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 13:43:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_db_create.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/beaf7390/src/fabric_db_create.erl
----------------------------------------------------------------------
diff --git a/src/fabric_db_create.erl b/src/fabric_db_create.erl
index c8f2d45..5a8334f 100644
--- a/src/fabric_db_create.erl
+++ b/src/fabric_db_create.erl
@@ -42,7 +42,7 @@ validate_dbname(DbName, Options) ->
     false ->
         ok;
     true ->
-        case re:run(DbName, ?DBNAME_REGEX, [{capture,none}]) of
+        case re:run(DbName, ?DBNAME_REGEX, [{capture,none}, dollar_endonly]) of
         match ->
             ok;
         nomatch when DbName =:= <<"_users">> ->


[31/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Update _all_docs coordinator to use new RPC APIs

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/ad460e40
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/ad460e40
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/ad460e40

Branch: refs/heads/windsor-merge-121
Commit: ad460e40eed6e2162d42c7eb2bc7e1306af4f092
Parents: dae200f
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Sep 4 17:28:57 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 14:37:06 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_all_docs.erl | 127 ++++++++++++++++++++------------------
 1 file changed, 66 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/ad460e40/src/fabric_view_all_docs.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl
index e5ed7b3..8dc9173 100644
--- a/src/fabric_view_all_docs.erl
+++ b/src/fabric_view_all_docs.erl
@@ -20,29 +20,26 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
-go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc0) ->
-    Workers = fabric_util:submit_jobs(mem3:shards(DbName),all_docs,[Options, QueryArgs]),
-    #mrargs{limit = Limit, skip = Skip} = QueryArgs,
-    State = #collector{
-        query_args = QueryArgs,
-        callback = Callback,
-        counters = fabric_dict:init(Workers, 0),
-        skip = Skip,
-        limit = Limit,
-        user_acc = Acc0
-    },
-    RexiMon = fabric_util:create_monitors(Workers),
-    try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
-        State, infinity, 5000) of
-    {ok, NewState} ->
-        {ok, NewState#collector.user_acc};
-    {timeout, NewState} ->
-        Callback({error, timeout}, NewState#collector.user_acc);
-    {error, Resp} ->
-        {ok, Resp}
+go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
+    Shards = mem3:shards(DbName),
+    Workers0 = fabric_util:submit_jobs(
+            Shards, fabric_rpc, all_docs, [Options, QueryArgs]),
+    RexiMon = fabric_util:create_monitors(Workers0),
+    try
+        case fabric_util:stream_start(Workers0, #shard.ref) of
+            {ok, Workers} ->
+                try
+                    go(DbName, Options, Workers, QueryArgs, Callback, Acc)
+                after
+                    fabric_util:cleanup(Workers)
+                end;
+            {timeout, _} ->
+                Callback({error, timeout}, Acc);
+            {error, Error} ->
+                Callback({error, Error}, Acc)
+        end
     after
-        rexi_monitor:stop(RexiMon),
-        fabric_util:cleanup(Workers)
+        rexi_monitor:stop(RexiMon)
     end;
 
 
@@ -81,19 +78,32 @@ go(DbName, Options, QueryArgs, Callback, Acc0) ->
         Callback(timeout, Acc0)
     end.
 
+go(DbName, _Options, Workers, QueryArgs, Callback, Acc0) ->
+    #mrargs{limit = Limit, skip = Skip} = QueryArgs,
+    State = #collector{
+        db_name = DbName,
+        query_args = QueryArgs,
+        callback = Callback,
+        counters = fabric_dict:init(Workers, 0),
+        skip = Skip,
+        limit = Limit,
+        user_acc = Acc0
+    },
+    case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
+        State, infinity, 5000) of
+    {ok, NewState} ->
+        {ok, NewState#collector.user_acc};
+    {timeout, NewState} ->
+        Callback({error, timeout}, NewState#collector.user_acc);
+    {error, Resp} ->
+        {ok, Resp}
+    end.
+
 handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    fabric_view:remove_down_shards(State, NodeRef);
+    fabric_view:check_down_shards(State, NodeRef);
 
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
-    #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
-    Counters = fabric_dict:erase(Worker, Counters0),
-    case fabric_view:is_progress_possible(Counters) of
-    true ->
-        {ok, State#collector{counters = Counters}};
-    false ->
-        {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-        {error, Resp}
-    end;
+    fabric_view:handle_worker_exit(State, Worker, Reason);
 
 handle_message({meta, Meta0}, {Worker, From}, State) ->
     Tot = couch_util:get_value(total, Meta0, 0),
@@ -105,35 +115,30 @@ handle_message({meta, Meta0}, {Worker, From}, State) ->
         offset = Offset0,
         user_acc = AccIn
     } = State,
-    case fabric_dict:lookup_element(Worker, Counters0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate
-        gen_server:reply(From, stop),
-        {ok, State};
-    0 ->
-        gen_server:reply(From, ok),
-        Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
-        Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
-        Total = Total0 + Tot,
-        Offset = Offset0 + Off,
-        case fabric_dict:any(0, Counters2) of
-        true ->
-            {ok, State#collector{
-                counters = Counters2,
-                total_rows = Total,
-                offset = Offset
-            }};
-        false ->
-            FinalOffset = erlang:min(Total, Offset+State#collector.skip),
-            Meta = [{total, Total}, {offset, FinalOffset}],
-            {Go, Acc} = Callback({meta, Meta}, AccIn),
-            {Go, State#collector{
-                counters = fabric_dict:decrement_all(Counters2),
-                total_rows = Total,
-                offset = FinalOffset,
-                user_acc = Acc
-            }}
-        end
+    % Assert that we don't have other messages from this
+    % worker when the total_and_offset message arrives.
+    0 = fabric_dict:lookup_element(Worker, Counters0),
+    rexi:stream_ack(From),
+    Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+    Total = Total0 + Tot,
+    Offset = Offset0 + Off,
+    case fabric_dict:any(0, Counters1) of
+    true ->
+        {ok, State#collector{
+            counters = Counters1,
+            total_rows = Total,
+            offset = Offset
+        }};
+    false ->
+        FinalOffset = erlang:min(Total, Offset+State#collector.skip),
+        Meta = [{total, Total}, {offset, FinalOffset}],
+        {Go, Acc} = Callback({meta, Meta}, AccIn),
+        {Go, State#collector{
+            counters = fabric_dict:decrement_all(Counters1),
+            total_rows = Total,
+            offset = FinalOffset,
+            user_acc = Acc
+        }}
     end;
 
 handle_message(#view_row{} = Row, {Worker, From}, State) ->


[07/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Separate is_owner logic from owner_of


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/2c341cf6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/2c341cf6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/2c341cf6

Branch: refs/heads/windsor-merge-121
Commit: 2c341cf65309ee463d09c03b0a609a4cc407f258
Parents: 1f91869
Author: Robert Newson <ro...@cloudant.com>
Authored: Sat Aug 10 19:05:49 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 10:53:37 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 31 +++++++++++++++----------------
 1 file changed, 15 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/2c341cf6/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 81b747b..0f426d3 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -373,7 +373,7 @@ calculate_start_seq(_Db, Seq) when is_integer(Seq) ->
 calculate_start_seq(Db, {Seq, Uuid, Node}) ->
     case couch_db:get_uuid(Db) == Uuid of
         true ->
-            case owner(Node, Seq, couch_db:get_epochs(Db)) of
+            case is_owner(Node, Seq, couch_db:get_epochs(Db)) of
                 true -> Seq;
                 false -> 0
             end;
@@ -383,16 +383,15 @@ calculate_start_seq(Db, {Seq, Uuid, Node}) ->
             0
     end.
 
-owner(Node, Seq, Epochs) ->
-    owner(Node, Seq, Epochs, infinity).
+is_owner(Node, Seq, Epochs) ->
+    Node =:= owner_of(Seq, Epochs).
 
-owner(_Node, _Seq, [], _HighSeq) ->
-    false;
-owner(Node, Seq, [{EpochNode, EpochSeq} | _Rest], HighSeq)
-  when Node =:= EpochNode andalso Seq < HighSeq andalso Seq > EpochSeq ->
-    true;
-owner(Node, Seq, [{_EpochNode, EpochSeq} | Rest], _HighSeq) ->
-    owner(Node, Seq, Rest, EpochSeq).
+owner_of(_Seq, []) ->
+    undefined;
+owner_of(Seq, [{EpochNode, EpochSeq} | _Rest]) when Seq > EpochSeq ->
+    EpochNode;
+owner_of(Seq, [_ | Rest]) ->
+    owner_of(Seq, Rest).
 
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
@@ -411,11 +410,11 @@ calculate_start_seq_test() ->
     %% return integer if we didn't get a vector.
     ?assertEqual(4, calculate_start_seq(#db{}, 4)).
 
-owner_test() ->
-    ?assertNot(owner(foo, 1, [])),
-    ?assert(owner(foo, 1, [{foo, 1}])),
-    ?assert(owner(foo, 50, [{bar, 100}, {foo, 1}])),
-    ?assert(owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])),
-    ?assert(owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])).
+is_owner_test() ->
+    ?assertNot(is_owner(foo, 1, [])),
+    ?assert(is_owner(foo, 1, [{foo, 1}])),
+    ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])),
+    ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])),
+    ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])).
 
 -endif.


[05/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Monitor coordinator pids for update notifiers

We were leaking fabric_db_update_listener rexi workers. This uses the
new couch_event configurability to have the workers automatically exit
when the coordinator dies.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/61209201
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/61209201
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/61209201

Branch: refs/heads/windsor-merge-121
Commit: 61209201e3210927a7bac85348062b3c8908ccea
Parents: 400bce8
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Aug 8 12:19:32 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 10:51:27 2014 +0100

----------------------------------------------------------------------
 src/fabric_db_update_listener.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/61209201/src/fabric_db_update_listener.erl
----------------------------------------------------------------------
diff --git a/src/fabric_db_update_listener.erl b/src/fabric_db_update_listener.erl
index f3f8a53..9a0f2c6 100644
--- a/src/fabric_db_update_listener.erl
+++ b/src/fabric_db_update_listener.erl
@@ -70,7 +70,7 @@ start_update_notifier(DbNames) ->
     {Caller, Ref} = get(rexi_from),
     Notify = config:get("cloudant", "maintenance_mode", "false") /= "true",
     State = #cb_state{client_pid = Caller, client_ref = Ref, notify = Notify},
-    Options = [{dbnames, DbNames}],
+    Options = [{parent, Caller}, {dbnames, DbNames}],
     couch_event:listen(?MODULE, handle_db_event, State, Options).
 
 handle_db_event(_DbName, updated, #cb_state{notify = true} = St) ->


[16/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Handle bad_request message from worker

fabric_doc_update handles bad_request messages that can be sent
by its workers if they fail to parse a multipart/related document.

BugzID: 22593


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/674c211a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/674c211a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/674c211a

Branch: refs/heads/windsor-merge-121
Commit: 674c211a223f763c156b7948183299a0a829d280
Parents: 3804a0a
Author: Mike Wallace <mi...@googlemail.com>
Authored: Wed Aug 21 13:51:58 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 11:12:16 2014 +0100

----------------------------------------------------------------------
 src/fabric_doc_update.erl | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/674c211a/src/fabric_doc_update.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl
index 8ec12ac..ddae65d 100644
--- a/src/fabric_doc_update.erl
+++ b/src/fabric_doc_update.erl
@@ -87,7 +87,9 @@ handle_message({missing_stub, Stub}, _, _) ->
 handle_message({not_found, no_db_file} = X, Worker, Acc0) ->
     {_, _, _, GroupedDocs, _} = Acc0,
     Docs = couch_util:get_value(Worker, GroupedDocs),
-    handle_message({ok, [X || _D <- Docs]}, Worker, Acc0).
+    handle_message({ok, [X || _D <- Docs]}, Worker, Acc0);
+handle_message({bad_request, Msg}, _, _) ->
+    throw({bad_request, Msg}).
 
 force_reply(Doc, [], {_, W, Acc}) ->
     {error, W, [{Doc, {error, internal_server_error}} | Acc]};


[20/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Upgrade continuous changes feeds to new code

BugzID: 23154


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/2f1db98b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/2f1db98b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/2f1db98b

Branch: refs/heads/windsor-merge-121
Commit: 2f1db98b84040ffc3c9fdb5bf494af8b7ef51538
Parents: 5d00748
Author: Robert Newson <ro...@cloudant.com>
Authored: Tue Sep 17 23:33:30 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 11:13:34 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/2f1db98b/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index a6c468e..dff9d4a 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -14,6 +14,9 @@
 
 -export([go/5, pack_seqs/1, unpack_seqs/2]).
 
+%% exported for upgrade purposes.
+-export([keep_sending_changes/8]).
+
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
@@ -96,7 +99,7 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0)
             Callback({stop, LastSeq}, AccOut);
         _ ->
             {ok, AccTimeout} = Callback(timeout, AccOut),
-            keep_sending_changes(
+            ?MODULE:keep_sending_changes(
                 DbName,
                 Args#changes_args{limit=Limit2},
                 Callback,


[13/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Downgrade new style since values unintentionally released into the wild


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/e47db787
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/e47db787
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/e47db787

Branch: refs/heads/windsor-merge-121
Commit: e47db787c512f76162e34d5c24a7b43d9f600850
Parents: 1649efc
Author: Robert Newson <ro...@cloudant.com>
Authored: Wed Aug 14 22:05:47 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 10:55:08 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/e47db787/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index fb8303f..cff28fd 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -339,7 +339,7 @@ do_unpack_seqs(Opaque, DbName) ->
     Unpacked = lists:flatmap(fun({Node, [A,B], Seq}) ->
         case mem3:get_shard(DbName, Node, [A,B]) of
         {ok, Shard} ->
-            [{Shard, Seq}];
+            [{Shard, seq(Seq)}];
         {error, not_found} ->
             []
         end


[44/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Replace read_repair logs with a metric

Successful read repair notices aren't overly informative. To reduce log
spam we'll just switch to using a metric and only log when we don't have
a successful response.

BugzId: 24262


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/49dd38c8
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/49dd38c8
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/49dd38c8

Branch: refs/heads/windsor-merge-121
Commit: 49dd38c8a39186e06a19e01757cf83ba7ae3fd82
Parents: f1d0c22
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Oct 17 15:35:39 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 17:22:13 2014 +0100

----------------------------------------------------------------------
 src/fabric_doc_open.erl      | 7 ++++++-
 src/fabric_doc_open_revs.erl | 7 ++++++-
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/49dd38c8/src/fabric_doc_open.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_open.erl b/src/fabric_doc_open.erl
index 77b6337..26bd2a3 100644
--- a/src/fabric_doc_open.erl
+++ b/src/fabric_doc_open.erl
@@ -120,7 +120,12 @@ read_repair(#acc{dbname=DbName, replies=Replies}) ->
         Ctx = #user_ctx{roles=[<<"_admin">>]},
         Opts = [replicated_changes, {user_ctx, Ctx}],
         Res = fabric:update_docs(DbName, Docs, Opts),
-        couch_log:notice("read_repair ~s ~s ~p", [DbName, Id, Res]),
+        case Res of
+            {ok, []} ->
+                ok;
+            _ ->
+                couch_log:notice("read_repair ~s ~s ~p", [DbName, Id, Res])
+        end,
         choose_reply(Docs);
     [] ->
         % Try hard to return some sort of information

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/49dd38c8/src/fabric_doc_open_revs.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_open_revs.erl b/src/fabric_doc_open_revs.erl
index 662990a..31d7616 100644
--- a/src/fabric_doc_open_revs.erl
+++ b/src/fabric_doc_open_revs.erl
@@ -167,7 +167,12 @@ maybe_execute_read_repair(Db, Docs) ->
     [#doc{id=Id} | _] = Docs,
     Ctx = #user_ctx{roles=[<<"_admin">>]},
     Res = fabric:update_docs(Db, Docs, [replicated_changes, {user_ctx,Ctx}]),
-    couch_log:notice("read_repair ~s ~s ~p", [Db, Id, Res]).
+    case Res of
+        {ok, []} ->
+            ok;
+        _ ->
+            couch_log:notice("read_repair ~s ~s ~p", [Db, Id, Res])
+    end.
 
 % hackery required so that not_found sorts first
 strip_not_found_missing([]) ->


[12/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Revert "Revert "Revert uuid in changes feeds temporarily""

This reverts commit 92190f71fecf4a22102154231a2de09ad910616c.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/1649efcb
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/1649efcb
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/1649efcb

Branch: refs/heads/windsor-merge-121
Commit: 1649efcbbf83149bb6b6654253bf019201de1860
Parents: d462044
Author: Robert Newson <ro...@cloudant.com>
Authored: Wed Aug 14 17:28:28 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 10:54:56 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 70 +++++--------------------------------------------
 1 file changed, 7 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1649efcb/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 870ac57..c6ac263 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -29,19 +29,18 @@
 
 changes(DbName, #changes_args{} = Args, StartSeq) ->
     changes(DbName, [Args], StartSeq);
-changes(DbName, Options, StartVector) ->
+changes(DbName, Options, StartSeq) ->
     erlang:put(io_priority, {interactive, DbName}),
     #changes_args{dir=Dir} = Args = lists:keyfind(changes_args, 1, Options),
     case get_or_create_db(DbName, []) of
     {ok, Db} ->
-        StartSeq = calculate_start_seq(Db, StartVector),
         Enum = fun changes_enumerator/2,
         Opts = [{dir,Dir}],
         Acc0 = {Db, StartSeq, Args, Options},
         try
             {ok, {_, LastSeq, _, _}} =
                 couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0),
-            rexi:reply({complete, {LastSeq, couch_db:get_uuid(Db), node()}})
+            rexi:reply({complete, LastSeq})
         after
             couch_db:close(Db)
         end;
@@ -279,14 +278,11 @@ changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
 
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) ->
     Doc = doc_member(Db, DI, Opts),
-    Uuid = couch_db:get_uuid(Db),
-    #change{key={Seq, Uuid, node()}, id=Id, value=Results, doc=Doc, deleted=Del};
-changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
-    Uuid = couch_db:get_uuid(Db),
-    #change{key={Seq, Uuid, node()}, id=Id, value=Results, deleted=true};
-changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
-    Uuid = couch_db:get_uuid(Db),
-    #change{key={Seq, Uuid, node()}, id=Id, value=Results}.
+    #change{key=Seq, id=Id, value=Results, doc=Doc, deleted=Del};
+changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
+    #change{key=Seq, id=Id, value=Results, deleted=true};
+changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
+    #change{key=Seq, id=Id, value=Results}.
 
 doc_member(Shard, DocInfo, Opts) ->
     case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of
@@ -367,55 +363,3 @@ set_io_priority(DbName, Options) ->
         _ ->
             ok
     end.
-
-calculate_start_seq(_Db, Seq) when is_integer(Seq) ->
-    Seq;
-calculate_start_seq(Db, {Seq, Uuid, Node}) ->
-    case couch_db:get_uuid(Db) == Uuid of
-        true ->
-            case owner(Node, Seq, couch_db:get_epochs(Db)) of
-                true -> Seq;
-                false -> 0
-            end;
-        false ->
-            %% The file was rebuilt, most likely in a different
-            %% order, so rewind.
-            0
-    end.
-
-owner(Node, Seq, Epochs) ->
-    owner(Node, Seq, Epochs, infinity).
-
-owner(_Node, _Seq, [], _HighSeq) ->
-    false;
-owner(Node, Seq, [{EpochNode, EpochSeq} | _Rest], HighSeq)
-  when Node =:= EpochNode andalso Seq < HighSeq andalso Seq >= EpochSeq ->
-    true;
-owner(Node, Seq, [{_EpochNode, EpochSeq} | Rest], _HighSeq) ->
-    owner(Node, Seq, Rest, EpochSeq).
-
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
-calculate_start_seq_test() ->
-    %% uuid mismatch is always a rewind.
-    Hdr1 = couch_db_header:new(),
-    Hdr2 = couch_db_header:set(Hdr1, [{uuid, uuid1}]),
-    ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, {1, uuid2, node1})),
-    %% uuid matches and seq is owned by node.
-    Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]),
-    ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, {2, uuid1, node1})),
-    %% uuids match but seq is not owned by node.
-    Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]),
-    ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, {3, uuid1, node1})),
-    %% return integer if we didn't get a vector.
-    ?assertEqual(4, calculate_start_seq(#db{}, 4)).
-
-owner_test() ->
-    ?assertNot(owner(foo, 1, [])),
-    ?assert(owner(foo, 1, [{foo, 1}])),
-    ?assert(owner(foo, 50, [{bar, 100}, {foo, 1}])),
-    ?assert(owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])),
-    ?assert(owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])).
-
--endif.


[24/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Change write quorum error logs into metrics

The global_changes work uncovered the fact that the common "write quourm
failed" log messages happen for run of the mill conflict errors. Rather
than try and change the logic around to fix the error this just adds
three metrics to cover the events so that we can monitor it in the
future.

BugzId: 23977


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/feb56949
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/feb56949
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/feb56949

Branch: refs/heads/windsor-merge-121
Commit: feb5694941f7adc310f937b71f5cd9e834c20990
Parents: 455afa7
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Oct 8 12:03:23 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 13:43:28 2014 +0100

----------------------------------------------------------------------
 src/fabric_doc_update.erl | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/feb56949/src/fabric_doc_update.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl
index 9e2ce50..91a3c23 100644
--- a/src/fabric_doc_update.erl
+++ b/src/fabric_doc_update.erl
@@ -112,17 +112,22 @@ force_reply(Doc, [FirstReply|_] = Replies, {Health, W, Acc}) ->
     {true, Reply} ->
         {Health, W, [{Doc,Reply} | Acc]};
     false ->
-        couch_log:warning("write quorum (~p) failed for ~s", [W, Doc#doc.id]),
         case [Reply || {ok, Reply} <- Replies] of
         [] ->
             % check if all errors are identical, if so inherit health
             case lists:all(fun(E) -> E =:= FirstReply end, Replies) of
             true ->
+                CounterKey = [fabric, doc_update, errors],
+                margaret_counter:increment(CounterKey),
                 {Health, W, [{Doc, FirstReply} | Acc]};
             false ->
+                CounterKey = [fabric, doc_update, mismatched_errors],
+                margaret_counter:increment(CounterKey),
                 {error, W, [{Doc, FirstReply} | Acc]}
             end;
         [AcceptedRev | _] ->
+            CounterKey = [fabric, doc_update, write_quorum_errors],
+            margaret_counter:increment(CounterKey),
             NewHealth = case Health of ok -> accepted; _ -> Health end,
             {NewHealth, W, [{Doc, {accepted,AcceptedRev}} | Acc]}
         end


[15/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Fix changes shard replacement

We were unwittingly falling victim to list comprehensions swallowing
exceptions again. The result this time was that anytime we needed to
replace a changes shard we were adding the entire shard map. This means
that any time we were resetting the feed its possible we could have
reset the feed for all shard ranges depending on the rexi worker race.

BugzId: 22808


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/3804a0a4
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/3804a0a4
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/3804a0a4

Branch: refs/heads/windsor-merge-121
Commit: 3804a0a4a944e12716aeeb5a7a4342b0b61c5050
Parents: f371174
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Sep 6 06:17:44 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 11:12:00 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/3804a0a4/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index cff28fd..293541e 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -362,7 +362,8 @@ do_unpack_seqs(Opaque, DbName) ->
         true ->
             Unpacked;
         false ->
-            Ranges = lists:usort([R || #shard{range=R} <- Unpacked]),
+            Extract = fun({Shard, _Seq}) -> Shard#shard.range end,
+            Ranges = lists:usort(lists:map(Extract, Unpacked)),
             Filter = fun(S) -> not lists:member(S#shard.range, Ranges) end,
             Replacements = lists:filter(Filter, mem3:shards(DbName)),
             Unpacked ++ [{R, 0} || R <- Replacements]


[40/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Provide replacement shards in map views

BugzId: 20423


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/5d9a0e55
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/5d9a0e55
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/5d9a0e55

Branch: refs/heads/windsor-merge-121
Commit: 5d9a0e55926c8a70dd923d7e408932cd7ab262ad
Parents: 4f639ce
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Sep 12 10:22:00 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 16:19:14 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_map.erl | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/5d9a0e55/src/fabric_view_map.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index cf70568..d2c010a 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -25,11 +25,15 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
 
 go(DbName, DDoc, View, Args, Callback, Acc) ->
     Shards = fabric_view:get_shards(DbName, Args),
-    Workers0 = fabric_util:submit_jobs(
-            Shards, fabric_rpc, map_view, [DDoc, View, Args]),
+    Repls = fabric_view:get_shard_replacements(DbName, Shards),
+    RPCArgs = [DDoc, View, Args],
+    StartFun = fun(Shard) ->
+        hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
+    end,
+    Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, map_view, RPCArgs),
     RexiMon = fabric_util:create_monitors(Workers0),
     try
-        case fabric_util:stream_start(Workers0, #shard.ref) of
+        case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
             {ok, Workers} ->
                 try
                     go(DbName, Workers, Args, Callback, Acc)
@@ -74,11 +78,6 @@ handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
     fabric_view:handle_worker_exit(State, Worker, Reason);
 
-handle_message({rexi_EXIT, Reason}, _, State) ->
-    #collector{callback=Callback, user_acc=Acc} = State,
-    {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-    {error, Resp};
-
 handle_message({meta, Meta0}, {Worker, From}, State) ->
     Tot = couch_util:get_value(total, Meta0, 0),
     Off = couch_util:get_value(offset, Meta0, 0),


[03/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Revert uuid in changes feeds temporarily

Reverts;
b271bcceff4dddc255af3e4ae57a1a36d6708b8a 19133-fix-code-upgrade
7102f90a85c80b02229a429185b40817182fbfcb 18726-use-uuid-generation-in-since


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/9aafe7b3
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/9aafe7b3
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/9aafe7b3

Branch: refs/heads/windsor-merge-121
Commit: 9aafe7b39069fc8c1534becf8fd434a717bab07f
Parents: 518f76d
Author: Robert Newson <ro...@cloudant.com>
Authored: Tue Jul 30 23:23:23 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 10:51:10 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 70 +++++--------------------------------------------
 1 file changed, 7 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/9aafe7b3/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 870ac57..c6ac263 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -29,19 +29,18 @@
 
 changes(DbName, #changes_args{} = Args, StartSeq) ->
     changes(DbName, [Args], StartSeq);
-changes(DbName, Options, StartVector) ->
+changes(DbName, Options, StartSeq) ->
     erlang:put(io_priority, {interactive, DbName}),
     #changes_args{dir=Dir} = Args = lists:keyfind(changes_args, 1, Options),
     case get_or_create_db(DbName, []) of
     {ok, Db} ->
-        StartSeq = calculate_start_seq(Db, StartVector),
         Enum = fun changes_enumerator/2,
         Opts = [{dir,Dir}],
         Acc0 = {Db, StartSeq, Args, Options},
         try
             {ok, {_, LastSeq, _, _}} =
                 couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0),
-            rexi:reply({complete, {LastSeq, couch_db:get_uuid(Db), node()}})
+            rexi:reply({complete, LastSeq})
         after
             couch_db:close(Db)
         end;
@@ -279,14 +278,11 @@ changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
 
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) ->
     Doc = doc_member(Db, DI, Opts),
-    Uuid = couch_db:get_uuid(Db),
-    #change{key={Seq, Uuid, node()}, id=Id, value=Results, doc=Doc, deleted=Del};
-changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
-    Uuid = couch_db:get_uuid(Db),
-    #change{key={Seq, Uuid, node()}, id=Id, value=Results, deleted=true};
-changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
-    Uuid = couch_db:get_uuid(Db),
-    #change{key={Seq, Uuid, node()}, id=Id, value=Results}.
+    #change{key=Seq, id=Id, value=Results, doc=Doc, deleted=Del};
+changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
+    #change{key=Seq, id=Id, value=Results, deleted=true};
+changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
+    #change{key=Seq, id=Id, value=Results}.
 
 doc_member(Shard, DocInfo, Opts) ->
     case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of
@@ -367,55 +363,3 @@ set_io_priority(DbName, Options) ->
         _ ->
             ok
     end.
-
-calculate_start_seq(_Db, Seq) when is_integer(Seq) ->
-    Seq;
-calculate_start_seq(Db, {Seq, Uuid, Node}) ->
-    case couch_db:get_uuid(Db) == Uuid of
-        true ->
-            case owner(Node, Seq, couch_db:get_epochs(Db)) of
-                true -> Seq;
-                false -> 0
-            end;
-        false ->
-            %% The file was rebuilt, most likely in a different
-            %% order, so rewind.
-            0
-    end.
-
-owner(Node, Seq, Epochs) ->
-    owner(Node, Seq, Epochs, infinity).
-
-owner(_Node, _Seq, [], _HighSeq) ->
-    false;
-owner(Node, Seq, [{EpochNode, EpochSeq} | _Rest], HighSeq)
-  when Node =:= EpochNode andalso Seq < HighSeq andalso Seq >= EpochSeq ->
-    true;
-owner(Node, Seq, [{_EpochNode, EpochSeq} | Rest], _HighSeq) ->
-    owner(Node, Seq, Rest, EpochSeq).
-
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
-calculate_start_seq_test() ->
-    %% uuid mismatch is always a rewind.
-    Hdr1 = couch_db_header:new(),
-    Hdr2 = couch_db_header:set(Hdr1, [{uuid, uuid1}]),
-    ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, {1, uuid2, node1})),
-    %% uuid matches and seq is owned by node.
-    Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]),
-    ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, {2, uuid1, node1})),
-    %% uuids match but seq is not owned by node.
-    Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]),
-    ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, {3, uuid1, node1})),
-    %% return integer if we didn't get a vector.
-    ?assertEqual(4, calculate_start_seq(#db{}, 4)).
-
-owner_test() ->
-    ?assertNot(owner(foo, 1, [])),
-    ?assert(owner(foo, 1, [{foo, 1}])),
-    ?assert(owner(foo, 50, [{bar, 100}, {foo, 1}])),
-    ?assert(owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])),
-    ?assert(owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])).
-
--endif.


[27/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Add fabric_dict:fetch_keys/1


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/03086f38
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/03086f38
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/03086f38

Branch: refs/heads/windsor-merge-121
Commit: 03086f3833745888812c3bd96c92df80094dbfd5
Parents: 44185ed
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Oct 4 13:16:03 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 13:44:11 2014 +0100

----------------------------------------------------------------------
 src/fabric_dict.erl | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/03086f38/src/fabric_dict.erl
----------------------------------------------------------------------
diff --git a/src/fabric_dict.erl b/src/fabric_dict.erl
index 20700c0..f88ca97 100644
--- a/src/fabric_dict.erl
+++ b/src/fabric_dict.erl
@@ -22,6 +22,9 @@ init(Keys, InitialValue) ->
 is_key(Key, Dict) ->
     orddict:is_key(Key, Dict).
 
+fetch_keys(Dict) ->
+    orddict:fetch_keys(Dict).
+
 decrement_all(Dict) ->
     [{K,V-1} || {K,V} <- Dict].
 


[39/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Add utility function to find shard replacements

Streaming RPC coordinators can use this function to generate a list of
shard replacements that is suitable to be passed to
fabric_util:stream_start/4.

BugzId: 20423


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/4f639cea
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/4f639cea
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/4f639cea

Branch: refs/heads/windsor-merge-121
Commit: 4f639cead248356f24f1d9f1d95f1fb2dec7630d
Parents: 3523efd
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Sep 12 13:30:40 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 16:14:20 2014 +0100

----------------------------------------------------------------------
 src/fabric_view.erl | 31 ++++++++++++++++++++++++++++++-
 1 file changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/4f639cea/src/fabric_view.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
index 690625b..7cd5321 100644
--- a/src/fabric_view.erl
+++ b/src/fabric_view.erl
@@ -14,7 +14,8 @@
 
 -export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
     transform_row/1, keydict/1, extract_view/4, get_shards/2,
-    check_down_shards/2, handle_worker_exit/3]).
+    check_down_shards/2, handle_worker_exit/3,
+    get_shard_replacements/2]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -305,6 +306,34 @@ get_shards(DbName, #mrargs{stale=Stale})
 get_shards(DbName, #mrargs{stale=false}) ->
     mem3:shards(DbName).
 
+get_shard_replacements(DbName, UsedShards) ->
+    % We only want to generate a replacements list from shards
+    % that aren't already used.
+    AllLiveShards = mem3:live_shards(DbName, [node() | nodes()]),
+    UnusedShards = AllLiveShards -- UsedShards,
+
+    % If we have more than one copy of a range then we don't
+    % want to try and add a replacement to any copy.
+    RangeCounts = lists:foldl(fun(#shard{range=R}, Acc) ->
+        dict:update_counter(R, 1, Acc)
+    end, dict:new(), UsedShards),
+
+    % For each seq shard range with a count of 1, find any
+    % possible replacements from the unused shards. The
+    % replacement list is keyed by range.
+    lists:foldl(fun(#shard{range=Range}, Acc) ->
+        case dict:find(Range, RangeCounts) of
+            {ok, 1} ->
+                Repls = [S || S <- UnusedShards, S#shard.range =:= Range],
+                % Only keep non-empty lists of replacements
+                if Repls == [] -> Acc; true ->
+                    [{Range, Repls} | Acc]
+                end;
+            _ ->
+                Acc
+        end
+    end, [], UsedShards).
+
 % unit test
 is_progress_possible_test() ->
     EndPoint = 2 bsl 31,


[45/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Make all timeouts configurable

BugzID: 24302


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/209a2004
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/209a2004
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/209a2004

Branch: refs/heads/windsor-merge-121
Commit: 209a2004958f93c53b4e19dc356a7d03bed28bdc
Parents: 49dd38c
Author: Robert Newson <ro...@cloudant.com>
Authored: Mon Oct 21 18:05:11 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 17:22:34 2014 +0100

----------------------------------------------------------------------
 src/fabric_doc_attachments.erl |  6 ++++--
 src/fabric_util.erl            | 13 +++++++++++--
 src/fabric_view_all_docs.erl   |  6 ++++--
 3 files changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/209a2004/src/fabric_doc_attachments.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_attachments.erl b/src/fabric_doc_attachments.erl
index af56137..a703db9 100644
--- a/src/fabric_doc_attachments.erl
+++ b/src/fabric_doc_attachments.erl
@@ -34,11 +34,12 @@ receiver(Req, Length) when is_integer(Length) ->
     Middleman = spawn(fun() -> middleman(Req, Length) end),
     fun() ->
         Middleman ! {self(), gimme_data},
+        Timeout = fabric_util:attachments_timeout(),
         receive
             {Middleman, Data} ->
                 rexi:reply(attachment_chunk_received),
                 iolist_to_binary(Data)
-        after 600000 ->
+        after Timeout ->
             exit(timeout)
         end
     end;
@@ -64,6 +65,7 @@ maybe_send_continue(#httpd{mochi_req = MochiReq} = Req) ->
 
 write_chunks(MiddleMan, ChunkFun) ->
     MiddleMan ! {self(), gimme_data},
+    Timeout = fabric_util:attachments_timeout(),
     receive
     {MiddleMan, ChunkRecordList} ->
         rexi:reply(attachment_chunk_received),
@@ -71,7 +73,7 @@ write_chunks(MiddleMan, ChunkFun) ->
         continue -> write_chunks(MiddleMan, ChunkFun);
         done -> ok
         end
-    after 600000 ->
+    after Timeout ->
         exit(timeout)
     end.
 

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/209a2004/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index fb6cd8b..344ead7 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -15,7 +15,7 @@
 -export([submit_jobs/3, submit_jobs/4, cleanup/1, recv/4, get_db/1, get_db/2, error_info/1,
         update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
         remove_down_workers/2]).
--export([request_timeout/0]).
+-export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0]).
 -export([stream_start/2, stream_start/4]).
 
 -include_lib("fabric/include/fabric.hrl").
@@ -138,7 +138,16 @@ recv(Workers, Keypos, Fun, Acc0) ->
     rexi_utils:recv(Workers, Keypos, Fun, Acc0, request_timeout(), infinity).
 
 request_timeout() ->
-    case config:get("fabric", "request_timeout", "60000") of
+    timeout("request", "60000").
+
+all_docs_timeout() ->
+    timeout("all_docs", "10000").
+
+attachments_timeout() ->
+    timeout("attachments", "600000").
+
+timeout(Type, Default) ->
+    case config:get("fabric", Type ++ "_timeout", Default) of
         "infinity" -> infinity;
         N -> list_to_integer(N)
     end.

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/209a2004/src/fabric_view_all_docs.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl
index 8dc9173..28a1f91 100644
--- a/src/fabric_view_all_docs.erl
+++ b/src/fabric_view_all_docs.erl
@@ -68,13 +68,14 @@ go(DbName, Options, QueryArgs, Callback, Acc0) ->
         true -> lists:sublist(Keys2, Limit);
         false -> Keys2
     end,
+    Timeout = fabric_util:all_docs_timeout(),
     receive {'DOWN', Ref0, _, _, {ok, TotalRows}} ->
         {ok, Acc1} = Callback({meta, [{total, TotalRows}]}, Acc0),
         {ok, Acc2} = doc_receive_loop(
             Keys3, queue:new(), SpawnFun, MaxJobs, Callback, Acc1
         ),
         Callback(complete, Acc2)
-    after 10000 ->
+    after Timeout ->
         Callback(timeout, Acc0)
     end.
 
@@ -176,6 +177,7 @@ doc_receive_loop(Keys, Pids, SpawnFun, MaxJobs, Callback, AccIn) ->
         doc_receive_loop(RKeys, Pids1, SpawnFun, MaxJobs, Callback, AccIn);
     _ ->
         {{value, {Pid, Ref}}, RestPids} = queue:out(Pids),
+        Timeout = fabric_util:all_docs_timeout(),
         receive {'DOWN', Ref, process, Pid, #view_row{} = Row} ->
             case Callback(fabric_view:transform_row(Row), AccIn) of
             {ok, Acc} ->
@@ -186,7 +188,7 @@ doc_receive_loop(Keys, Pids, SpawnFun, MaxJobs, Callback, AccIn) ->
                 cancel_read_pids(RestPids),
                 {ok, Acc}
             end
-        after 10000 ->
+        after Timeout ->
             timeout
         end
     end.


[10/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Gracefully terminate continuous changes feeds

When maintenance_mode is set to "true" or "nolb", end all active continuous
changes feeds gracefully with a last_seq chunk and clean TCP
termination.

BugzID: 21618


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/a698d48c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/a698d48c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/a698d48c

Branch: refs/heads/windsor-merge-121
Commit: a698d48c3ebe73fdb64977bcae41f4ae2de4094b
Parents: bef303b
Author: Robert Newson <ro...@cloudant.com>
Authored: Mon Aug 12 02:12:28 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 10:54:33 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/a698d48c/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index ddfe45f..fb8303f 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -76,7 +76,9 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0)
     {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, Timeout),
     #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
     LastSeq = pack_seqs(NewSeqs),
-    if Limit > Limit2, Feed == "longpoll" ->
+    MaintenanceMode = config:get("cloudant", "maintenance_mode"),
+    if Limit > Limit2, Feed == "longpoll";
+      MaintenanceMode == "true"; MaintenanceMode == "nolb" ->
         Callback({stop, LastSeq}, AccOut);
     true ->
         WaitForUpdate = wait_db_updated(UpListen),


[34/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Remove test for old reduce view behavior

This test is no longer valid because its checking that a complete
message removes other workers from the same range which no longer
happens. This is because fabric_util:stream_start/2 now removes all
workers and returns us a single complete shard ring.

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/42e069a3
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/42e069a3
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/42e069a3

Branch: refs/heads/windsor-merge-121
Commit: 42e069a38bd1cd1bc91a8bd8339cc3196762b486
Parents: c44406d
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Sep 4 16:23:02 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 16:07:13 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_reduce.erl | 15 ---------------
 1 file changed, 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/42e069a3/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 42b889b..16d6fb1 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -124,21 +124,6 @@ handle_message(complete, Worker, #collector{counters = Counters0} = State) ->
     C1 = fabric_dict:update_counter(Worker, 1, Counters0),
     fabric_view:maybe_send_row(State#collector{counters = C1}).
 
-complete_worker_test() ->
-    meck:new(config),
-    meck:expect(config, get, fun("rexi","server_per_node",_) -> rexi_server end),
-    Shards =
-        mem3_util:create_partition_map("foo",3,3,[node(),node(),node()]),
-    Workers = lists:map(fun(#shard{} = Shard) ->
-                            Ref = make_ref(),
-                            Shard#shard{ref = Ref}
-                        end,
-                        Shards),
-    State = #collector{counters=fabric_dict:init(Workers,0)},
-    {ok, NewState} = handle_message(complete, lists:nth(2,Workers), State),
-    meck:unload(config),
-    ?assertEqual(orddict:size(NewState#collector.counters),length(Workers) - 2).
-
 os_proc_needed(<<"_", _/binary>>) -> false;
 os_proc_needed(_) -> true.
 


[32/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Update map view coordinator to use new RPC APIs

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/ab1a71eb
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/ab1a71eb
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/ab1a71eb

Branch: refs/heads/windsor-merge-121
Commit: ab1a71ebef30664191c99cf8b924c188f3713bee
Parents: ad460e4
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Sep 6 07:26:52 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 14:44:30 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_map.erl | 100 +++++++++++++++++++++++--------------------
 1 file changed, 54 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/ab1a71eb/src/fabric_view_map.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index eb30179..cf70568 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -23,9 +23,29 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
     {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
     go(DbName, DDoc, View, Args, Callback, Acc0);
 
-go(DbName, DDoc, View, Args, Callback, Acc0) ->
+go(DbName, DDoc, View, Args, Callback, Acc) ->
     Shards = fabric_view:get_shards(DbName, Args),
-    Workers = fabric_util:submit_jobs(Shards, map_view, [DDoc, View, Args]),
+    Workers0 = fabric_util:submit_jobs(
+            Shards, fabric_rpc, map_view, [DDoc, View, Args]),
+    RexiMon = fabric_util:create_monitors(Workers0),
+    try
+        case fabric_util:stream_start(Workers0, #shard.ref) of
+            {ok, Workers} ->
+                try
+                    go(DbName, Workers, Args, Callback, Acc)
+                after
+                    fabric_util:cleanup(Workers)
+                end;
+            {timeout, _} ->
+                Callback({error, timeout}, Acc);
+            {error, Error} ->
+                Callback({error, Error}, Acc)
+        end
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+go(DbName, Workers, Args, Callback, Acc0) ->
     #mrargs{limit = Limit, skip = Skip, keys = Keys} = Args,
     State = #collector{
         db_name=DbName,
@@ -38,8 +58,7 @@ go(DbName, DDoc, View, Args, Callback, Acc0) ->
         sorted = Args#mrargs.sorted,
         user_acc = Acc0
     },
-    RexiMon = fabric_util:create_monitors(Workers),
-    try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
+    case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
         State, infinity, 1000 * 60 * 60) of
     {ok, NewState} ->
         {ok, NewState#collector.user_acc};
@@ -47,24 +66,18 @@ go(DbName, DDoc, View, Args, Callback, Acc0) ->
         Callback({error, timeout}, NewState#collector.user_acc);
     {error, Resp} ->
         {ok, Resp}
-    after
-        rexi_monitor:stop(RexiMon),
-        fabric_util:cleanup(Workers)
     end.
 
 handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    fabric_view:remove_down_shards(State, NodeRef);
+    fabric_view:check_down_shards(State, NodeRef);
 
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
-    #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
-    Counters = fabric_dict:erase(Worker, Counters0),
-    case fabric_view:is_progress_possible(Counters) of
-    true ->
-        {ok, State#collector{counters = Counters}};
-    false ->
-        {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-        {error, Resp}
-    end;
+    fabric_view:handle_worker_exit(State, Worker, Reason);
+
+handle_message({rexi_EXIT, Reason}, _, State) ->
+    #collector{callback=Callback, user_acc=Acc} = State,
+    {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
+    {error, Resp};
 
 handle_message({meta, Meta0}, {Worker, From}, State) ->
     Tot = couch_util:get_value(total, Meta0, 0),
@@ -76,35 +89,30 @@ handle_message({meta, Meta0}, {Worker, From}, State) ->
         offset = Offset0,
         user_acc = AccIn
     } = State,
-    case fabric_dict:lookup_element(Worker, Counters0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate
-        gen_server:reply(From, stop),
-        {ok, State};
-    0 ->
-        gen_server:reply(From, ok),
-        Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
-        Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
-        Total = Total0 + Tot,
-        Offset = Offset0 + Off,
-        case fabric_dict:any(0, Counters2) of
-        true ->
-            {ok, State#collector{
-                counters = Counters2,
-                total_rows = Total,
-                offset = Offset
-            }};
-        false ->
-            FinalOffset = erlang:min(Total, Offset+State#collector.skip),
-            Meta = [{total, Total}, {offset, FinalOffset}],
-            {Go, Acc} = Callback({meta, Meta}, AccIn),
-            {Go, State#collector{
-                counters = fabric_dict:decrement_all(Counters2),
-                total_rows = Total,
-                offset = FinalOffset,
-                user_acc = Acc
-            }}
-        end
+    % Assert that we don't have other messages from this
+    % worker when the total_and_offset message arrives.
+    0 = fabric_dict:lookup_element(Worker, Counters0),
+    rexi:stream_ack(From),
+    Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+    Total = Total0 + Tot,
+    Offset = Offset0 + Off,
+    case fabric_dict:any(0, Counters1) of
+    true ->
+        {ok, State#collector{
+            counters = Counters1,
+            total_rows = Total,
+            offset = Offset
+        }};
+    false ->
+        FinalOffset = erlang:min(Total, Offset+State#collector.skip),
+        Meta = [{total, Total}, {offset, FinalOffset}],
+        {Go, Acc} = Callback({meta, Meta}, AccIn),
+        {Go, State#collector{
+            counters = fabric_dict:decrement_all(Counters1),
+            total_rows = Total,
+            offset = FinalOffset,
+            user_acc = Acc
+        }}
     end;
 
 handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) ->


[18/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Fix hot upgrade for new since values


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/60e93b0b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/60e93b0b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/60e93b0b

Branch: refs/heads/windsor-merge-121
Commit: 60e93b0b485b40f09b6263e2c58250a6d6a0ece6
Parents: d9cf946
Author: Robert Newson <ro...@cloudant.com>
Authored: Mon Sep 16 22:17:49 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 11:12:34 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl          | 9 ++++++---
 src/fabric_view_changes.erl | 1 +
 2 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/60e93b0b/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 20d7a5a..7b4473f 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -277,13 +277,14 @@ changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
         {Go, {Db, Seq, Args, Options}}
     end.
 
+% TODO change to {Seq, uuid(Db)}
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) ->
     Doc = doc_member(Db, DI, Opts),
-    #change{key={Seq, uuid(Db)}, id=Id, value=Results, doc=Doc, deleted=Del};
+    #change{key={Seq, uuid(Db), []}, id=Id, value=Results, doc=Doc, deleted=Del};
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
-    #change{key={Seq, uuid(Db)}, id=Id, value=Results, deleted=true};
+    #change{key={Seq, uuid(Db), []}, id=Id, value=Results, deleted=true};
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
-    #change{key={Seq, uuid(Db)}, id=Id, value=Results}.
+    #change{key={Seq, uuid(Db), []}, id=Id, value=Results}.
 
 doc_member(Shard, DocInfo, Opts) ->
     case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of
@@ -367,6 +368,8 @@ set_io_priority(DbName, Options) ->
 
 calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) ->
     Seq;
+calculate_start_seq(Db, Node, {Seq, Uuid, _}) -> % remove me
+    calculate_start_seq(Db, Node, {Seq, Uuid});
 calculate_start_seq(Db, Node, {Seq, Uuid}) ->
     case is_prefix(Uuid, couch_db:get_uuid(Db)) of
         true ->

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/60e93b0b/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 4e0cd9e..a54748a 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -297,6 +297,7 @@ pack_seqs(Workers) ->
     Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])),
     [SeqSum, Opaque].
 
+seq({Seq, _Uuid, _Node}) -> Seq; % remove me
 seq({Seq, _Uuid}) -> Seq;
 seq(Seq)          -> Seq.
 


[08/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Add negative test for fencepost


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/a8a458b8
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/a8a458b8
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/a8a458b8

Branch: refs/heads/windsor-merge-121
Commit: a8a458b8b1b4a7032b1f5a49927ee384a99ce019
Parents: 2c341cf
Author: Robert Newson <ro...@cloudant.com>
Authored: Sat Aug 10 19:11:29 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 10:54:20 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/a8a458b8/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 0f426d3..825691b 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -412,7 +412,8 @@ calculate_start_seq_test() ->
 
 is_owner_test() ->
     ?assertNot(is_owner(foo, 1, [])),
-    ?assert(is_owner(foo, 1, [{foo, 1}])),
+    ?assertNot(is_owner(foo, 1, [{foo, 1}])),
+    ?assert(is_owner(foo, 2, [{foo, 1}])),
     ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])),
     ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])),
     ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])).


[33/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Update reduce view coordinator to use new RPC APIs

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/c44406d0
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/c44406d0
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/c44406d0

Branch: refs/heads/windsor-merge-121
Commit: c44406d0057d35b44b92f9c6ec4e3c749ba269c5
Parents: ab1a71e
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Sep 4 16:00:01 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 16:06:58 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_reduce.erl | 80 ++++++++++++++++++++---------------------
 1 file changed, 40 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/c44406d0/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 12bed6d..42b889b 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -19,11 +19,35 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
-go(DbName, DDoc, VName, Args, Callback, Acc0, {red, {_, Lang, _}, _}=VInfo) ->
-    Shards = fabric_view:get_shards(DbName, Args),
-    Workers = fabric_util:submit_jobs(Shards, reduce_view, [DDoc, VName, Args]),
+go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) ->
+    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
+    go(DbName, DDoc, View, Args, Callback, Acc0, VInfo);
+
+go(DbName, DDoc, VName, Args, Callback, Acc, {red, {_, Lang, _}, _}=VInfo) ->
     RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
-    RexiMon = fabric_util:create_monitors(Workers),
+    Workers0 = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
+        Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName,Args]}),
+        Shard#shard{ref = Ref}
+    end, fabric_view:get_shards(DbName, Args)),
+    RexiMon = fabric_util:create_monitors(Workers0),
+    try
+        case fabric_util:stream_start(Workers0, #shard.ref) of
+            {ok, Workers} ->
+                try
+                    go2(DbName, Workers, Lang, RedSrc, Args, Callback, Acc)
+                after
+                    fabric_util:cleanup(Workers)
+                end;
+            {timeout, _} ->
+                Callback({error, timeout}, Acc);
+            {error, Error} ->
+                Callback({error, Error}, Acc)
+        end
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+go2(DbName, Workers, Lang, RedSrc, Args, Callback, Acc0) ->
     #mrargs{limit = Limit, skip = Skip, keys = Keys} = Args,
     OsProc = case os_proc_needed(RedSrc) of
         true -> couch_query_servers:get_os_process(Lang);
@@ -52,27 +76,16 @@ go(DbName, DDoc, VName, Args, Callback, Acc0, {red, {_, Lang, _}, _}=VInfo) ->
     {error, Resp} ->
         {ok, Resp}
     after
-        rexi_monitor:stop(RexiMon),
-        fabric_util:cleanup(Workers),
-        case State#collector.os_proc of
-            nil -> ok;
-            OsProc -> catch couch_query_servers:ret_os_process(OsProc)
+        if OsProc == nil -> ok; true ->
+            catch couch_query_servers:ret_os_process(OsProc)
         end
     end.
 
 handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    fabric_view:remove_down_shards(State, NodeRef);
+    fabric_view:check_down_shards(State, NodeRef);
 
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
-    #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
-    Counters = fabric_dict:erase(Worker, Counters0),
-    case fabric_view:is_progress_possible(Counters) of
-    true ->
-        {ok, State#collector{counters = Counters}};
-    false ->
-        {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-        {error, Resp}
-    end;
+    fabric_view:handle_worker_exit(State, Worker, Reason);
 
 %% HACK: this just sends meta once. Instead we should move the counter logic
 %% from the #view_row handle_message below into this function and and pass the
@@ -100,29 +113,16 @@ handle_message({meta, Meta}, {_Worker, From}, State) ->
 
 handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
     #collector{counters = Counters0, rows = Rows0} = State,
-    case fabric_dict:lookup_element(Worker, Counters0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate it
-        gen_server:reply(From, stop),
-        {ok, State};
-    _ ->
-        Rows = dict:append(Key, Row#view_row{worker={Worker, From}}, Rows0),
-        C1 = fabric_dict:update_counter(Worker, 1, Counters0),
-        % TODO time this call, if slow don't do it every time
-        C2 = fabric_view:remove_overlapping_shards(Worker, C1),
-        State1 = State#collector{rows=Rows, counters=C2},
-        fabric_view:maybe_send_row(State1)
-    end;
+    true = fabric_dict:is_key(Worker, Counters0),
+    Rows = dict:append(Key, Row#view_row{worker={Worker, From}}, Rows0),
+    C1 = fabric_dict:update_counter(Worker, 1, Counters0),
+    State1 = State#collector{rows=Rows, counters=C1},
+    fabric_view:maybe_send_row(State1);
 
 handle_message(complete, Worker, #collector{counters = Counters0} = State) ->
-    case fabric_dict:lookup_element(Worker, Counters0) of
-    undefined ->
-        {ok, State};
-    _ ->
-        C1 = fabric_dict:update_counter(Worker, 1, Counters0),
-        C2 = fabric_view:remove_overlapping_shards(Worker, C1),
-        fabric_view:maybe_send_row(State#collector{counters = C2})
-    end.
+    true = fabric_dict:is_key(Worker, Counters0),
+    C1 = fabric_dict:update_counter(Worker, 1, Counters0),
+    fabric_view:maybe_send_row(State#collector{counters = C1}).
 
 complete_worker_test() ->
     meck:new(config),


[38/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Allow shard replacements in stream_start

This adds fabric_util:stream_start/4 which accepts a start function and
a proplist of replacement shards keyed by range. If a worker exits due
to maintenance mode and prevents progress from being made, the list of
replacement shards is searched for any replacements for the range. If
any are found the start function is called for each possible
replacement. The start function takes the possible replacement and
should return the new worker.

BugzId: 20423


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/3523efde
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/3523efde
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/3523efde

Branch: refs/heads/windsor-merge-121
Commit: 3523efde8bde945634e398d586fd79e948c9187c
Parents: fdc0c81
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Sep 12 10:23:48 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 16:13:44 2014 +0100

----------------------------------------------------------------------
 src/fabric_util.erl | 82 +++++++++++++++++++++++++++++++++++-------------
 1 file changed, 60 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/3523efde/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index f11abe3..fb6cd8b 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -16,13 +16,19 @@
         update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
         remove_down_workers/2]).
 -export([request_timeout/0]).
--export([stream_start/2]).
+-export([stream_start/2, stream_start/4]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
+-record(stream_acc, {
+    workers,
+    start_fun,
+    replacements
+}).
+
 remove_down_workers(Workers, BadNode) ->
     Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end,
     NewWorkers = fabric_dict:filter(Filter, Workers),
@@ -45,12 +51,19 @@ submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
 cleanup(Workers) ->
     [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
 
-stream_start(Workers0, Keypos) ->
+stream_start(Workers, Keypos) ->
+    stream_start(Workers, Keypos, undefined, undefined).
+
+stream_start(Workers0, Keypos, StartFun, Replacements) ->
     Fun = fun handle_stream_start/3,
-    Acc = fabric_dict:init(Workers0, waiting),
+    Acc = #stream_acc{
+        workers = fabric_dict:init(Workers0, waiting),
+        start_fun = StartFun,
+        replacements = Replacements
+    },
     Timeout = request_timeout(),
     case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
-        {ok, Workers} ->
+        {ok, #stream_acc{workers=Workers}} ->
             true = fabric_view:is_progress_possible(Workers),
             AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
                 rexi:stream_start(From),
@@ -61,36 +74,61 @@ stream_start(Workers0, Keypos) ->
             Else
     end.
 
-handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    case fabric_util:remove_down_workers(State, NodeRef) of
-    {ok, NewState} ->
-        {ok, NewState};
+handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
+    case fabric_util:remove_down_workers(St#stream_acc.workers, NodeRef) of
+    {ok, Workers} ->
+        {ok, St#stream_acc{workers=Workers}};
     error ->
         Reason = {nodedown, <<"progress not possible">>},
         {error, Reason}
     end;
-handle_stream_start({rexi_EXIT, Reason}, Worker, State) ->
-    NewState = fabric_dict:erase(Worker, State),
-    case fabric_view:is_progress_possible(NewState) of
-    true ->
-        {ok, NewState};
-    false ->
+handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
+    Workers = fabric_dict:erase(Worker, St#stream_acc.workers),
+    case {fabric_view:is_progress_possible(Workers), Reason} of
+    {true, _} ->
+        {ok, St#stream_acc{workers=Workers}};
+    {false, {maintenance_mode, _Node}} ->
+        % Check if we have replacements for this range
+        % and start the new workers if so.
+        case lists:keytake(Worker#shard.range, 1, St#stream_acc.replacements) of
+            {value, {_Range, WorkerReplacements}, NewReplacements} ->
+                FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
+                    NewWorker = (St#stream_acc.start_fun)(Repl),
+                    fabric_dict:store(NewWorker, waiting, NewWorkers)
+                end, Workers, WorkerReplacements),
+                % Assert that our replaced worker provides us
+                % the oppurtunity to make progress.
+                true = fabric_view:is_progress_possible(FinalWorkers),
+                NewRefs = fabric_dict:fetch_keys(FinalWorkers),
+                {new_refs, NewRefs, St#stream_acc{
+                    workers=FinalWorkers,
+                    replacements=NewReplacements
+                }};
+            false ->
+                % If we progress isn't possible and we don't have any
+                % replacements then we're dead in the water.
+                Error = {nodedown, <<"progress not possible">>},
+                {error, Error}
+        end;
+    {false, _} ->
         {error, fabric_util:error_info(Reason)}
     end;
-handle_stream_start(rexi_STREAM_INIT, {Worker, From}, State) ->
-    case fabric_dict:lookup_element(Worker, State) of
+handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
+    case fabric_dict:lookup_element(Worker, St#stream_acc.workers) of
     undefined ->
         % This worker lost the race with other partition copies, terminate
         rexi:stream_cancel(From),
-        {ok, State};
+        {ok, St};
     waiting ->
         % Don't ack the worker yet so they don't start sending us
         % rows until we're ready
-        NewState0 = fabric_dict:store(Worker, From, State),
-        NewState1 = fabric_view:remove_overlapping_shards(Worker, NewState0),
-        case fabric_dict:any(waiting, NewState1) of
-            true -> {ok, NewState1};
-            false -> {stop, NewState1}
+        Workers0 = fabric_dict:store(Worker, From, St#stream_acc.workers),
+        Workers1 = fabric_view:remove_overlapping_shards(Worker, Workers0),
+        case fabric_dict:any(waiting, Workers1) of
+            true ->
+                {ok, St#stream_acc{workers=Workers1}};
+            false ->
+                {stop, St#stream_acc{workers=Workers1}}
         end
     end;
 handle_stream_start(Else, _, _) ->


[35/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Update _changes coordinator to use new RPC APIs

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/5fd7f6b3
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/5fd7f6b3
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/5fd7f6b3

Branch: refs/heads/windsor-merge-121
Commit: 5fd7f6b3a50240620adfa0a77eaca2a95fe7d826
Parents: 42e069a
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Sep 4 15:59:39 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 16:08:05 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 155 +++++++++++++++------------------------
 1 file changed, 59 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/5fd7f6b3/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 8d162ce..69d9d09 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -130,9 +130,30 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
             end, find_replacement_shards(Shard, AllLiveShards))
         end
     end, unpack_seqs(PackedSeqs, DbName)),
-    {Workers, _} = lists:unzip(Seqs),
-    RexiMon = fabric_util:create_monitors(Workers),
+    {Workers0, _} = lists:unzip(Seqs),
+    RexiMon = fabric_util:create_monitors(Workers0),
+    try
+        case fabric_util:stream_start(Workers0, #shard.ref) of
+            {ok, Workers} ->
+                try
+                    LiveSeqs = lists:filter(fun({W, _S}) ->
+                        lists:member(W, Workers)
+                    end, Seqs),
+                    send_changes(DbName, Workers, LiveSeqs, ChangesArgs,
+                            Callback, AccIn, Timeout)
+                after
+                    fabric_util:cleanup(Workers)
+                end;
+            Else ->
+                Callback(Else, AccIn)
+        end
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+send_changes(DbName, Workers, Seqs, ChangesArgs, Callback, AccIn, Timeout) ->
     State = #collector{
+        db_name = DbName,
         query_args = ChangesArgs,
         callback = Callback,
         counters = orddict:from_list(Seqs),
@@ -141,12 +162,7 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
         rows = Seqs % store sequence positions instead
     },
     %% TODO: errors need to be handled here
-    try
-        receive_results(Workers, State, Timeout, Callback)
-    after
-        rexi_monitor:stop(RexiMon),
-        fabric_util:cleanup(Workers)
-    end.
+    receive_results(Workers, State, Timeout, Callback).
 
 receive_results(Workers, State, Timeout, Callback) ->
     case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State,
@@ -159,25 +175,11 @@ receive_results(Workers, State, Timeout, Callback) ->
         {ok, NewState}
     end.
 
-handle_message({rexi_DOWN, _, {_, NodeRef}, _}, nil, State) ->
-    fabric_view:remove_down_shards(State, NodeRef);
+handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+    fabric_view:check_down_shards(State, NodeRef);
 
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
-    #collector{
-        callback=Callback,
-        counters=Counters0,
-        rows = Seqs0,
-        user_acc=Acc
-    } = State,
-    Counters = fabric_dict:erase(Worker, Counters0),
-    Seqs = fabric_dict:erase(Worker, Seqs0),
-    case fabric_view:is_progress_possible(Counters) of
-    true ->
-        {ok, State#collector{counters = Counters, rows=Seqs}};
-    false ->
-        {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-        {error, Resp}
-    end;
+    fabric_view:handle_worker_exit(State, Worker, Reason);
 
 handle_message(_, _, #collector{limit=0} = State) ->
     {stop, State};
@@ -190,84 +192,45 @@ handle_message(#change{key=Key} = Row0, {Worker, From}, St) ->
         limit = Limit,
         user_acc = AccIn
     } = St,
-    case fabric_dict:lookup_element(Worker, S0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate it
-        gen_server:reply(From, stop),
-        {ok, St};
-    _ ->
-        S1 = fabric_dict:store(Worker, Key, S0),
-        S2 = fabric_view:remove_overlapping_shards(Worker, S1),
-        % this check should not be necessary at all, as holes in the ranges
-        % created from DOWN messages would have led to errors
-        case fabric_view:is_progress_possible(S2) of
-        true ->
-            % Temporary hack for FB 23637
-            Interval = erlang:get(changes_seq_interval),
-            if (Interval == undefined) orelse (Limit rem Interval == 0) ->
-                Row = Row0#change{key = pack_seqs(S2)};
-            true ->
-                Row = Row0#change{key = null}
-            end,
-            {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
-            gen_server:reply(From, Go),
-            {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}};
-        false ->
-            Reason = {range_not_covered, <<"progress not possible">>},
-            Callback({error, Reason}, AccIn),
-            gen_server:reply(From, stop),
-            {stop, St#collector{counters=S2}}
-        end
-    end;
+    true = fabric_dict:is_key(Worker, S0),
+    S1 = fabric_dict:store(Worker, Key, S0),
+    % Temporary hack for FB 23637
+    Interval = erlang:get(changes_seq_interval),
+    if (Interval == undefined) orelse (Limit rem Interval == 0) ->
+        Row = Row0#change{key = pack_seqs(S1)};
+    true ->
+        Row = Row0#change{key = null}
+    end,
+    {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
+    rexi:stream_ack(From),
+    {Go, St#collector{counters=S1, limit=Limit-1, user_acc=Acc}};
 
 handle_message({no_pass, Seq}, {Worker, From}, St) ->
-    #collector{
-        counters = S0
-    } = St,
-    case fabric_dict:lookup_element(Worker, S0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate it
-        gen_server:reply(From, stop),
-        {ok, St};
-    _ ->
-        S1 = fabric_dict:store(Worker, Seq, S0),
-        S2 = fabric_view:remove_overlapping_shards(Worker, S1),
-        gen_server:reply(From, ok),
-        {ok, St#collector{counters=S2}}
-    end;
+    #collector{counters = S0} = St,
+    true = fabric_dict:is_key(Worker, S0),
+    S1 = fabric_dict:store(Worker, Seq, S0),
+    rexi:stream_ack(From),
+    {ok, St#collector{counters=S1}};
 
 handle_message({complete, Key}, Worker, State) ->
     #collector{
-        callback = Callback,
         counters = S0,
-        total_rows = Completed, % override
-        user_acc = Acc
+        total_rows = Completed % override
     } = State,
-    case fabric_dict:lookup_element(Worker, S0) of
-    undefined ->
-        {ok, State};
-    _ ->
-        S1 = fabric_dict:store(Worker, Key, S0),
-        % unlikely to have overlaps here, but possible w/ filters
-        S2 = fabric_view:remove_overlapping_shards(Worker, S1),
-        NewState = State#collector{counters=S2, total_rows=Completed+1},
-        case fabric_dict:size(S2) =:= (Completed+1) of
-        true ->
-            % check ranges are covered, again this should not be neccessary
-            % as any holes in the ranges due to DOWN messages would have errored
-            % out sooner
-            case fabric_view:is_progress_possible(S2) of
-            true ->
-                {stop, NewState};
-            false ->
-                Reason = {range_not_covered, <<"progress not possible">>},
-                Callback({error, Reason}, Acc),
-                {stop, NewState}
-            end;
-        false ->
-            {ok, NewState}
-        end
-    end.
+    true = fabric_dict:is_key(Worker, S0),
+    S1 = fabric_dict:store(Worker, Key, S0),
+    NewState = State#collector{counters=S1, total_rows=Completed+1},
+    % We're relying on S1 having exactly the numnber of workers that
+    % are participtaing in this response. With the new stream_start
+    % that's a bit more obvious but historically it wasn't quite
+    % so clear. The Completed variable is just a hacky override
+    % of the total_rows field in the #collector{} record.
+    NumWorkers = fabric_dict:size(S1),
+    Go = case NumWorkers =:= (Completed+1) of
+        true -> stop;
+        false -> ok
+    end,
+    {Go, NewState}.
 
 make_changes_args(#changes_args{style=Style, filter=undefined}=Args) ->
     Args#changes_args{filter = Style};


[50/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Report errors opening documents during _all_docs

All errors are currently ignored in the receive statements and
eventually we timeout.  This patch causes fabric to report the error and
terminate quickly instead of waiting.

BugzID: 24580


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/79e6e2fd
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/79e6e2fd
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/79e6e2fd

Branch: refs/heads/windsor-merge-121
Commit: 79e6e2fd179982c35a0264f39a9c5c267face1d1
Parents: df0cd0b
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri Oct 25 22:59:23 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 20:16:20 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_all_docs.erl | 39 +++++++++++++++++++++++++--------------
 1 file changed, 25 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/79e6e2fd/src/fabric_view_all_docs.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl
index 28a1f91..cda748a 100644
--- a/src/fabric_view_all_docs.erl
+++ b/src/fabric_view_all_docs.erl
@@ -69,12 +69,17 @@ go(DbName, Options, QueryArgs, Callback, Acc0) ->
         false -> Keys2
     end,
     Timeout = fabric_util:all_docs_timeout(),
-    receive {'DOWN', Ref0, _, _, {ok, TotalRows}} ->
-        {ok, Acc1} = Callback({meta, [{total, TotalRows}]}, Acc0),
-        {ok, Acc2} = doc_receive_loop(
-            Keys3, queue:new(), SpawnFun, MaxJobs, Callback, Acc1
-        ),
-        Callback(complete, Acc2)
+    receive {'DOWN', Ref0, _, _, Result} ->
+        case Result of
+            {ok, TotalRows} ->
+                {ok, Acc1} = Callback({meta, [{total, TotalRows}]}, Acc0),
+                {ok, Acc2} = doc_receive_loop(
+                    Keys3, queue:new(), SpawnFun, MaxJobs, Callback, Acc1
+                ),
+                Callback(complete, Acc2);
+            Error ->
+                Callback({error, Error}, Acc0)
+        end
     after Timeout ->
         Callback(timeout, Acc0)
     end.
@@ -178,15 +183,21 @@ doc_receive_loop(Keys, Pids, SpawnFun, MaxJobs, Callback, AccIn) ->
     _ ->
         {{value, {Pid, Ref}}, RestPids} = queue:out(Pids),
         Timeout = fabric_util:all_docs_timeout(),
-        receive {'DOWN', Ref, process, Pid, #view_row{} = Row} ->
-            case Callback(fabric_view:transform_row(Row), AccIn) of
-            {ok, Acc} ->
-                doc_receive_loop(
-                    Keys, RestPids, SpawnFun, MaxJobs, Callback, Acc
-                );
-            {stop, Acc} ->
+        receive {'DOWN', Ref, process, Pid, Row} ->
+            case Row of
+            #view_row{} ->
+                case Callback(fabric_view:transform_row(Row), AccIn) of
+                {ok, Acc} ->
+                    doc_receive_loop(
+                        Keys, RestPids, SpawnFun, MaxJobs, Callback, Acc
+                    );
+                {stop, Acc} ->
+                    cancel_read_pids(RestPids),
+                    {ok, Acc}
+                end;
+            Error ->
                 cancel_read_pids(RestPids),
-                {ok, Acc}
+                Callback({error, Error}, AccIn)
             end
         after Timeout ->
             timeout


[02/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Make shard open backoff configurable


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/518f76d1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/518f76d1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/518f76d1

Branch: refs/heads/windsor-merge-121
Commit: 518f76d1e2d1051982220528a113c0007ad17816
Parents: ae1c64a
Author: Benjamin Anderson <b...@banjiewen.net>
Authored: Thu Jun 20 15:13:00 2013 -0700
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 10:49:10 2014 +0100

----------------------------------------------------------------------
 src/fabric_util.erl | 11 ++++++-----
 1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/518f76d1/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index 52055e9..c93efda 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -64,20 +64,21 @@ get_db(DbName, Options) ->
     % suppress shards from down nodes
     Nodes = [node()|erlang:nodes()],
     Live = [S || #shard{node = N} = S <- Shards, lists:member(N, Nodes)],
-    get_shard(Live, Options, 100).
+    Factor = list_to_integer(config:get("fabric", "shard_timeout_factor", "2")),
+    get_shard(Live, Options, 100, Factor).
 
-get_shard([], _Opts, _Timeout) ->
+get_shard([], _Opts, _Timeout, _Factor) ->
     erlang:error({internal_server_error, "No DB shards could be opened."});
-get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout) ->
+get_shard([#shard{node = Node, name = Name} | Rest], Opts, Timeout, Factor) ->
     case rpc:call(Node, couch_db, open, [Name, [{timeout, Timeout} | Opts]]) of
     {ok, Db} ->
         {ok, Db};
     {unauthorized, _} = Error ->
         throw(Error);
     {badrpc, {'EXIT', {timeout, _}}} ->
-        get_shard(Rest, Opts, 2*Timeout);
+        get_shard(Rest, Opts, Factor * Timeout, Factor);
     _Else ->
-        get_shard(Rest, Opts, Timeout)
+        get_shard(Rest, Opts, Timeout, Factor)
     end.
 
 error_info({{<<"reduce_overflow_error">>, _} = Error, _Stack}) ->


[29/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Implement fabric_view:check_down_shards/2

This replaces fabric_view:remove_down_shards/2 with a check to see if a
rexi_DOWN message affects any of the worker shards. This is because we
monitor all the shards but are only handling messages from a single ring
which may not have included a node in the initial shards list.

We no longer need to remove shards on down nodes because once our stream
handling functions run as soon as we see a shard on a downed node we
know that progress is no longer possible under our current behavior.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/e4199a8e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/e4199a8e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/e4199a8e

Branch: refs/heads/windsor-merge-121
Commit: e4199a8e1dac8dca08b1edf349b0c3883c251180
Parents: f0c675e
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Sep 10 20:56:27 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 13:55:55 2014 +0100

----------------------------------------------------------------------
 src/fabric_view.erl | 23 +++++++++++++----------
 1 file changed, 13 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/e4199a8e/src/fabric_view.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
index 117dead..be41ce1 100644
--- a/src/fabric_view.erl
+++ b/src/fabric_view.erl
@@ -14,24 +14,27 @@
 
 -export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
     transform_row/1, keydict/1, extract_view/4, get_shards/2,
-    remove_down_shards/2]).
+    check_down_shards/2]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
--spec remove_down_shards(#collector{}, node()) ->
+%% @doc Check if a downed node affects any of our workers
+-spec check_down_shards(#collector{}, node()) ->
     {ok, #collector{}} | {error, any()}.
-remove_down_shards(Collector, BadNode) ->
+check_down_shards(Collector, BadNode) ->
     #collector{callback=Callback, counters=Counters, user_acc=Acc} = Collector,
-    case fabric_util:remove_down_workers(Counters, BadNode) of
-    {ok, NewCounters} ->
-        {ok, Collector#collector{counters = NewCounters}};
-    error ->
-        Reason = {nodedown, <<"progress not possible">>},
-        Callback({error, Reason}, Acc),
-        {error, Reason}
+    Filter = fun(#shard{node = Node}, _) -> Node == BadNode end,
+    BadCounters = fabric_dict:filter(Filter, Counters),
+    case fabric_dict:size(BadCounters) > 0 of
+        true ->
+            Reason = {nodedown, <<"progress not possible">>},
+            Callback({error, Reason}, Acc),
+            {error, Reason};
+        false ->
+            {ok, Collector}
     end.
 
 %% @doc looks for a fully covered keyrange in the list of counters


[47/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Pass document id & rev rather than the whole document

This uses the ddoc_cache support for revisions. It should
reduce bandwidth overhead of copying documents around.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/1c2f572d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/1c2f572d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/1c2f572d

Branch: refs/heads/windsor-merge-121
Commit: 1c2f572d04f5026ae343fac55b64f4360ae690ae
Parents: eec85af
Author: Brian Mitchell <br...@cloudant.com>
Authored: Mon Oct 21 13:14:40 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 19:58:57 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl         | 14 ++++++++------
 src/fabric_util.erl        |  7 ++++++-
 src/fabric_view_map.erl    |  2 +-
 src/fabric_view_reduce.erl |  2 +-
 4 files changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1c2f572d/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index d61da40..cee30f2 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -64,10 +64,11 @@ all_docs(DbName, Options, #mrargs{keys=undefined} = Args0) ->
     couch_mrview:query_all_docs(Db, Args, fun view_cb/2, VAcc0).
 
 %% @equiv map_view(DbName, DDoc, ViewName, Args0, [])
-map_view(DbName, DDoc, ViewName, Args0) ->
-    map_view(DbName, DDoc, ViewName, Args0, []).
+map_view(DbName, DDocInfo, ViewName, Args0) ->
+    map_view(DbName, DDocInfo, ViewName, Args0, []).
 
-map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
+map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
+    {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev),
     set_io_priority(DbName, DbOptions),
     Args = fix_skip_and_limit(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
@@ -75,10 +76,11 @@ map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0).
 
 %% @equiv reduce_view(DbName, DDoc, ViewName, Args0)
-reduce_view(DbName, DDoc, ViewName, Args0) ->
-    reduce_view(DbName, DDoc, ViewName, Args0, []).
+reduce_view(DbName, DDocInfo, ViewName, Args0) ->
+    reduce_view(DbName, DDocInfo, ViewName, Args0, []).
 
-reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
+reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
+    {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev),
     set_io_priority(DbName, DbOptions),
     Args = fix_skip_and_limit(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1c2f572d/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index 344ead7..642d7d6 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -14,10 +14,12 @@
 
 -export([submit_jobs/3, submit_jobs/4, cleanup/1, recv/4, get_db/1, get_db/2, error_info/1,
         update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
-        remove_down_workers/2]).
+        remove_down_workers/2, doc_id_and_rev/1]).
 -export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0]).
 -export([stream_start/2, stream_start/4]).
 
+-compile({inline, [{doc_id_and_rev,1}]}).
+
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
@@ -271,3 +273,6 @@ remove_ancestors_test() ->
 %% test function
 kv(Item, Count) ->
     {make_key(Item), {Item,Count}}.
+
+doc_id_and_rev(#doc{id=DocId, revs={RevNum, [RevHash|_]}}) ->
+    {DocId, {RevNum, RevHash}}.

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1c2f572d/src/fabric_view_map.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index d2c010a..1201daf 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -26,7 +26,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
 go(DbName, DDoc, View, Args, Callback, Acc) ->
     Shards = fabric_view:get_shards(DbName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
-    RPCArgs = [DDoc, View, Args],
+    RPCArgs = [fabric_util:doc_id_and_rev(DDoc), View, Args],
     StartFun = fun(Shard) ->
         hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
     end,

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1c2f572d/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 67c36a3..2e0d1f2 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -25,7 +25,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -
 
 go(DbName, DDoc, VName, Args, Callback, Acc, {red, {_, Lang, _}, _}=VInfo) ->
     RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
-    RPCArgs = [DDoc, VName, Args],
+    RPCArgs = [fabric_util:doc_id_and_rev(DDoc), VName, Args],
     Shards = fabric_view:get_shards(DbName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
     StartFun = fun(Shard) ->


[37/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Fix `fabric:get_db_info/1`

We weren't properly removing shards that had an error as well as
returning errors to functions that don't expect them.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/fdc0c810
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/fdc0c810
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/fdc0c810

Branch: refs/heads/windsor-merge-121
Commit: fdc0c8104ddd51d7b696392af6d399f42fb0ff12
Parents: a8b9e3f
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Oct 4 16:34:11 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 16:13:33 2014 +0100

----------------------------------------------------------------------
 src/fabric_db_info.erl | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/fdc0c810/src/fabric_db_info.erl
----------------------------------------------------------------------
diff --git a/src/fabric_db_info.erl b/src/fabric_db_info.erl
index 58139e8..de88632 100644
--- a/src/fabric_db_info.erl
+++ b/src/fabric_db_info.erl
@@ -21,9 +21,13 @@ go(DbName) ->
     Shards = mem3:shards(DbName),
     Workers = fabric_util:submit_jobs(Shards, get_db_info, []),
     RexiMon = fabric_util:create_monitors(Shards),
+    Fun = fun handle_message/3,
     Acc0 = {fabric_dict:init(Workers, nil), []},
     try
-        fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0)
+        case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
+            {ok, Acc} -> {ok, Acc};
+            {error, Error} -> throw(Error)
+        end
     after
         rexi_monitor:stop(RexiMon)
     end.
@@ -37,7 +41,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
     end;
 
 handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
-    NewCounters = lists:keydelete(Shard, #shard.ref, Counters),
+    NewCounters = fabric_dict:erase(Shard, Counters),
     case fabric_view:is_progress_possible(NewCounters) of
     true ->
         {ok, {NewCounters, Acc}};


[09/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Validate epochs


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/bef303b1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/bef303b1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/bef303b1

Branch: refs/heads/windsor-merge-121
Commit: bef303b1ceb9d37e0059dff9aa069e52d234236d
Parents: a8a458b
Author: Robert Newson <ro...@cloudant.com>
Authored: Sat Aug 10 19:19:04 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 10:54:26 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/bef303b1/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 825691b..bc0486a 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -384,6 +384,7 @@ calculate_start_seq(Db, {Seq, Uuid, Node}) ->
     end.
 
 is_owner(Node, Seq, Epochs) ->
+    validate_epochs(Epochs),
     Node =:= owner_of(Seq, Epochs).
 
 owner_of(_Seq, []) ->
@@ -393,6 +394,18 @@ owner_of(Seq, [{EpochNode, EpochSeq} | _Rest]) when Seq > EpochSeq ->
 owner_of(Seq, [_ | Rest]) ->
     owner_of(Seq, Rest).
 
+validate_epochs(Epochs) ->
+    %% Assert uniqueness.
+    case length(Epochs) == length(lists:ukeysort(2, Epochs)) of
+        true  -> ok;
+        false -> erlang:error(duplicate_epoch)
+    end,
+    %% Assert order.
+    case Epochs == lists:sort(fun({_, A}, {_, B}) -> B =< A end, Epochs) of
+        true  -> ok;
+        false -> erlang:error(epoch_order)
+    end.
+
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
 
@@ -416,6 +429,8 @@ is_owner_test() ->
     ?assert(is_owner(foo, 2, [{foo, 1}])),
     ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])),
     ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])),
-    ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])).
+    ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])),
+    ?assertError(duplicate_epoch, is_owner(foo, 1, [{foo, 1}, {bar, 1}])),
+    ?assertError(epoch_order, is_owner(foo, 1, [{foo, 100}, {bar, 200}])).
 
 -endif.


[04/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Revert "Revert uuid in changes feeds temporarily"

This reverts commit 0796c6714978fd8302192c24f652c53844373773.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/400bce84
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/400bce84
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/400bce84

Branch: refs/heads/windsor-merge-121
Commit: 400bce8475b80abf16d982ebb1528d998d8ea28f
Parents: 9aafe7b
Author: Robert Newson <ro...@cloudant.com>
Authored: Wed Jul 31 00:11:29 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 10:51:19 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 70 ++++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 63 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/400bce84/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index c6ac263..870ac57 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -29,18 +29,19 @@
 
 changes(DbName, #changes_args{} = Args, StartSeq) ->
     changes(DbName, [Args], StartSeq);
-changes(DbName, Options, StartSeq) ->
+changes(DbName, Options, StartVector) ->
     erlang:put(io_priority, {interactive, DbName}),
     #changes_args{dir=Dir} = Args = lists:keyfind(changes_args, 1, Options),
     case get_or_create_db(DbName, []) of
     {ok, Db} ->
+        StartSeq = calculate_start_seq(Db, StartVector),
         Enum = fun changes_enumerator/2,
         Opts = [{dir,Dir}],
         Acc0 = {Db, StartSeq, Args, Options},
         try
             {ok, {_, LastSeq, _, _}} =
                 couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0),
-            rexi:reply({complete, LastSeq})
+            rexi:reply({complete, {LastSeq, couch_db:get_uuid(Db), node()}})
         after
             couch_db:close(Db)
         end;
@@ -278,11 +279,14 @@ changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
 
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) ->
     Doc = doc_member(Db, DI, Opts),
-    #change{key=Seq, id=Id, value=Results, doc=Doc, deleted=Del};
-changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
-    #change{key=Seq, id=Id, value=Results, deleted=true};
-changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
-    #change{key=Seq, id=Id, value=Results}.
+    Uuid = couch_db:get_uuid(Db),
+    #change{key={Seq, Uuid, node()}, id=Id, value=Results, doc=Doc, deleted=Del};
+changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
+    Uuid = couch_db:get_uuid(Db),
+    #change{key={Seq, Uuid, node()}, id=Id, value=Results, deleted=true};
+changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
+    Uuid = couch_db:get_uuid(Db),
+    #change{key={Seq, Uuid, node()}, id=Id, value=Results}.
 
 doc_member(Shard, DocInfo, Opts) ->
     case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of
@@ -363,3 +367,55 @@ set_io_priority(DbName, Options) ->
         _ ->
             ok
     end.
+
+calculate_start_seq(_Db, Seq) when is_integer(Seq) ->
+    Seq;
+calculate_start_seq(Db, {Seq, Uuid, Node}) ->
+    case couch_db:get_uuid(Db) == Uuid of
+        true ->
+            case owner(Node, Seq, couch_db:get_epochs(Db)) of
+                true -> Seq;
+                false -> 0
+            end;
+        false ->
+            %% The file was rebuilt, most likely in a different
+            %% order, so rewind.
+            0
+    end.
+
+owner(Node, Seq, Epochs) ->
+    owner(Node, Seq, Epochs, infinity).
+
+owner(_Node, _Seq, [], _HighSeq) ->
+    false;
+owner(Node, Seq, [{EpochNode, EpochSeq} | _Rest], HighSeq)
+  when Node =:= EpochNode andalso Seq < HighSeq andalso Seq >= EpochSeq ->
+    true;
+owner(Node, Seq, [{_EpochNode, EpochSeq} | Rest], _HighSeq) ->
+    owner(Node, Seq, Rest, EpochSeq).
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+calculate_start_seq_test() ->
+    %% uuid mismatch is always a rewind.
+    Hdr1 = couch_db_header:new(),
+    Hdr2 = couch_db_header:set(Hdr1, [{uuid, uuid1}]),
+    ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, {1, uuid2, node1})),
+    %% uuid matches and seq is owned by node.
+    Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]),
+    ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, {2, uuid1, node1})),
+    %% uuids match but seq is not owned by node.
+    Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]),
+    ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, {3, uuid1, node1})),
+    %% return integer if we didn't get a vector.
+    ?assertEqual(4, calculate_start_seq(#db{}, 4)).
+
+owner_test() ->
+    ?assertNot(owner(foo, 1, [])),
+    ?assert(owner(foo, 1, [{foo, 1}])),
+    ?assert(owner(foo, 50, [{bar, 100}, {foo, 1}])),
+    ?assert(owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])),
+    ?assert(owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])).
+
+-endif.


[17/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Include uuid in since values

BugzID: 21658


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/d9cf9463
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/d9cf9463
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/d9cf9463

Branch: refs/heads/windsor-merge-121
Commit: d9cf9463ee4f187e5ef20e8373a8f023191f6c18
Parents: 674c211
Author: Robert Newson <ro...@cloudant.com>
Authored: Wed Sep 11 16:26:34 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 11:12:24 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl          | 92 +++++++++++++++++++++++++++++++++++++---
 src/fabric_view_changes.erl |  6 +--
 2 files changed, 88 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/d9cf9463/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index c6ac263..20d7a5a 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -29,18 +29,19 @@
 
 changes(DbName, #changes_args{} = Args, StartSeq) ->
     changes(DbName, [Args], StartSeq);
-changes(DbName, Options, StartSeq) ->
+changes(DbName, Options, StartVector) ->
     erlang:put(io_priority, {interactive, DbName}),
     #changes_args{dir=Dir} = Args = lists:keyfind(changes_args, 1, Options),
     case get_or_create_db(DbName, []) of
     {ok, Db} ->
+        StartSeq = calculate_start_seq(Db, node(), StartVector),
         Enum = fun changes_enumerator/2,
         Opts = [{dir,Dir}],
         Acc0 = {Db, StartSeq, Args, Options},
         try
             {ok, {_, LastSeq, _, _}} =
                 couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0),
-            rexi:reply({complete, LastSeq})
+            rexi:reply({complete, {LastSeq, uuid(Db)}})
         after
             couch_db:close(Db)
         end;
@@ -278,11 +279,11 @@ changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
 
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) ->
     Doc = doc_member(Db, DI, Opts),
-    #change{key=Seq, id=Id, value=Results, doc=Doc, deleted=Del};
-changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
-    #change{key=Seq, id=Id, value=Results, deleted=true};
-changes_row(_, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
-    #change{key=Seq, id=Id, value=Results}.
+    #change{key={Seq, uuid(Db)}, id=Id, value=Results, doc=Doc, deleted=Del};
+changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
+    #change{key={Seq, uuid(Db)}, id=Id, value=Results, deleted=true};
+changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
+    #change{key={Seq, uuid(Db)}, id=Id, value=Results}.
 
 doc_member(Shard, DocInfo, Opts) ->
     case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of
@@ -363,3 +364,80 @@ set_io_priority(DbName, Options) ->
         _ ->
             ok
     end.
+
+calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) ->
+    Seq;
+calculate_start_seq(Db, Node, {Seq, Uuid}) ->
+    case is_prefix(Uuid, couch_db:get_uuid(Db)) of
+        true ->
+            case is_owner(Node, Seq, couch_db:get_epochs(Db)) of
+                true -> Seq;
+                false -> 0
+            end;
+        false ->
+            %% The file was rebuilt, most likely in a different
+            %% order, so rewind.
+            0
+    end.
+
+is_prefix(Pattern, Subject) ->
+     binary:longest_common_prefix([Pattern, Subject]) == size(Pattern).
+
+is_owner(Node, Seq, Epochs) ->
+    validate_epochs(Epochs),
+    Node =:= owner_of(Seq, Epochs).
+
+owner_of(_Seq, []) ->
+    undefined;
+owner_of(Seq, [{EpochNode, EpochSeq} | _Rest]) when Seq > EpochSeq ->
+    EpochNode;
+owner_of(Seq, [_ | Rest]) ->
+    owner_of(Seq, Rest).
+
+validate_epochs(Epochs) ->
+    %% Assert uniqueness.
+    case length(Epochs) == length(lists:ukeysort(2, Epochs)) of
+        true  -> ok;
+        false -> erlang:error(duplicate_epoch)
+    end,
+    %% Assert order.
+    case Epochs == lists:sort(fun({_, A}, {_, B}) -> B =< A end, Epochs) of
+        true  -> ok;
+        false -> erlang:error(epoch_order)
+    end.
+
+uuid(Db) ->
+    Uuid = couch_db:get_uuid(Db),
+    binary:part(Uuid, {0, uuid_prefix_len()}).
+
+uuid_prefix_len() ->
+    list_to_integer(config:get("fabric", "uuid_prefix_len", "7")).
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+calculate_start_seq_test() ->
+    %% uuid mismatch is always a rewind.
+    Hdr1 = couch_db_header:new(),
+    Hdr2 = couch_db_header:set(Hdr1, [{epochs, [{node1, 1}]}, {uuid, <<"uuid1">>}]),
+    ?assertEqual(0, calculate_start_seq(#db{header=Hdr2}, node1, {1, <<"uuid2">>})),
+    %% uuid matches and seq is owned by node.
+    Hdr3 = couch_db_header:set(Hdr2, [{epochs, [{node1, 1}]}]),
+    ?assertEqual(2, calculate_start_seq(#db{header=Hdr3}, node1, {2, <<"uuid1">>})),
+    %% uuids match but seq is not owned by node.
+    Hdr4 = couch_db_header:set(Hdr2, [{epochs, [{node2, 2}, {node1, 1}]}]),
+    ?assertEqual(0, calculate_start_seq(#db{header=Hdr4}, node1, {3, <<"uuid1">>})),
+    %% return integer if we didn't get a vector.
+    ?assertEqual(4, calculate_start_seq(#db{}, foo, 4)).
+
+is_owner_test() ->
+    ?assertNot(is_owner(foo, 1, [])),
+    ?assertNot(is_owner(foo, 1, [{foo, 1}])),
+    ?assert(is_owner(foo, 2, [{foo, 1}])),
+    ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])),
+    ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])),
+    ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])),
+    ?assertError(duplicate_epoch, is_owner(foo, 1, [{foo, 1}, {bar, 1}])),
+    ?assertError(epoch_order, is_owner(foo, 1, [{foo, 100}, {bar, 200}])).
+
+-endif.

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/d9cf9463/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 293541e..4e0cd9e 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -297,8 +297,8 @@ pack_seqs(Workers) ->
     Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])),
     [SeqSum, Opaque].
 
-seq({Seq, _Uuid, _Node}) -> Seq;
-seq(Seq)                 -> Seq.
+seq({Seq, _Uuid}) -> Seq;
+seq(Seq)          -> Seq.
 
 unpack_seqs(0, DbName) ->
     fabric_dict:init(mem3:shards(DbName), 0);
@@ -339,7 +339,7 @@ do_unpack_seqs(Opaque, DbName) ->
     Unpacked = lists:flatmap(fun({Node, [A,B], Seq}) ->
         case mem3:get_shard(DbName, Node, [A,B]) of
         {ok, Shard} ->
-            [{Shard, seq(Seq)}];
+            [{Shard, Seq}];
         {error, not_found} ->
             []
         end


[23/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Allow options to all database functions

This is the first of two commits. The second needs to be in a second
release.

BugzId: 23787


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/455afa77
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/455afa77
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/455afa77

Branch: refs/heads/windsor-merge-121
Commit: 455afa777310049de563b23205640fa1f7d859ad
Parents: 687bd9d
Author: Paul J. Davis <pa...@gmail.com>
Authored: Mon Sep 30 13:52:05 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 13:42:35 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 51 +++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 41 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/455afa77/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 2cc31ff..7f9b88b 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -20,6 +20,9 @@
     set_revs_limit/3, create_shard_db_doc/2, delete_shard_db_doc/2]).
 -export([get_all_security/2]).
 
+-export([get_db_info/2, get_doc_count/2, get_update_seq/2,
+         changes/4, map_view/5, reduce_view/5, group_info/3]).
+
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
@@ -27,12 +30,16 @@
 %% rpc endpoints
 %%  call to with_db will supply your M:F with a #db{} and then remaining args
 
-changes(DbName, #changes_args{} = Args, StartSeq) ->
-    changes(DbName, [Args], StartSeq);
-changes(DbName, Options, StartVector) ->
+%% @equiv changes(DbName, Args, StartSeq, [])
+changes(DbName, Args, StartSeq) ->
+    changes(DbName, Args, StartSeq, []).
+
+changes(DbName, #changes_args{} = Args, StartSeq, DbOptions) ->
+    changes(DbName, [Args], StartSeq, DbOptions);
+changes(DbName, Options, StartVector, DbOptions) ->
     erlang:put(io_priority, {interactive, DbName}),
     #changes_args{dir=Dir} = Args = lists:keyfind(changes_args, 1, Options),
-    case get_or_create_db(DbName, []) of
+    case get_or_create_db(DbName, DbOptions) of
     {ok, Db} ->
         StartSeq = calculate_start_seq(Db, node(), StartVector),
         Enum = fun changes_enumerator/2,
@@ -55,15 +62,23 @@ all_docs(DbName, Options, #mrargs{keys=undefined} = Args0) ->
     VAcc0 = #vacc{db=Db},
     couch_mrview:query_all_docs(Db, Args, fun view_cb/2, VAcc0).
 
+%% @equiv map_view(DbName, DDoc, ViewName, Args0, [])
 map_view(DbName, DDoc, ViewName, Args0) ->
+    map_view(DbName, DDoc, ViewName, Args0, []).
+
+map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     Args = fix_skip_and_limit(Args0),
-    {ok, Db} = get_or_create_db(DbName, []),
+    {ok, Db} = get_or_create_db(DbName, DbOptions),
     VAcc0 = #vacc{db=Db},
     couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0).
 
+%% @equiv reduce_view(DbName, DDoc, ViewName, Args0)
 reduce_view(DbName, DDoc, ViewName, Args0) ->
+    reduce_view(DbName, DDoc, ViewName, Args0, []).
+
+reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     Args = fix_skip_and_limit(Args0),
-    {ok, Db} = get_or_create_db(DbName, []),
+    {ok, Db} = get_or_create_db(DbName, DbOptions),
     VAcc0 = #vacc{db=Db},
     couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0).
 
@@ -88,14 +103,26 @@ delete_db(DbName) ->
 delete_shard_db_doc(_, DocId) ->
     rexi:reply(mem3_util:delete_db_doc(DocId)).
 
+%% @equiv get_db_info(DbName, [])
 get_db_info(DbName) ->
-    with_db(DbName, [], {couch_db, get_db_info, []}).
+    get_db_info(DbName, []).
 
+get_db_info(DbName, DbOptions) ->
+    with_db(DbName, DbOptions, {couch_db, get_db_info, []}).
+
+%% equiv get_doc_count(DbName, [])
 get_doc_count(DbName) ->
-    with_db(DbName, [], {couch_db, get_doc_count, []}).
+    get_doc_count(DbName, []).
+
+get_doc_count(DbName, DbOptions) ->
+    with_db(DbName, DbOptions, {couch_db, get_doc_count, []}).
 
+%% equiv get_update_seq(DbName, [])
 get_update_seq(DbName) ->
-    with_db(DbName, [], {couch_db, get_update_seq, []}).
+    get_update_seq(DbName, []).
+
+get_update_seq(DbName, DbOptions) ->
+    with_db(DbName, DbOptions, {couch_db, get_update_seq, []}).
 
 set_security(DbName, SecObj, Options) ->
     with_db(DbName, Options, {couch_db, set_security, [SecObj]}).
@@ -144,8 +171,12 @@ update_docs(DbName, Docs0, Options) ->
     Docs = make_att_readers(Docs0),
     with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
 
+%% @equiv group_info(DbName, DDocId, [])
 group_info(DbName, DDocId) ->
-    with_db(DbName, [], {couch_mrview, get_info, [DDocId]}).
+    group_info(DbName, DDocId, []).
+
+group_info(DbName, DDocId, DbOptions) ->
+    with_db(DbName, DbOptions, {couch_mrview, get_info, [DDocId]}).
 
 reset_validation_funs(DbName) ->
     case get_or_create_db(DbName, []) of


[14/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Fix invalid reference in edoc

BugzID: 10870


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/f3711744
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/f3711744
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/f3711744

Branch: refs/heads/windsor-merge-121
Commit: f37117441e60204f8e2d1ae8af706fff1e36191c
Parents: e47db78
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri Aug 30 13:01:02 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 10:55:17 2014 +0100

----------------------------------------------------------------------
 src/fabric_db_delete.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/f3711744/src/fabric_db_delete.erl
----------------------------------------------------------------------
diff --git a/src/fabric_db_delete.erl b/src/fabric_db_delete.erl
index dced33c..2087f44 100644
--- a/src/fabric_db_delete.erl
+++ b/src/fabric_db_delete.erl
@@ -18,7 +18,7 @@
 
 %% @doc Options aren't used at all now in couch on delete but are left here
 %%      to be consistent with fabric_db_create for possible future use
-%% @see couch_server:delete_db
+%% @see couch_server:delete/2
 %%
 go(DbName, _Options) ->
     Shards = mem3:shards(DbName),


[43/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
[1/2] Use proplist instead of #change record

This commit lays the groundwork by translating the record into a
proplist.  A follow-on commit will cause the RPC workers to send the
proplist instead of the record.

It also converts the 'complete' tuple into a tagged proplist.

BugzID: 24236


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/f1d0c226
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/f1d0c226
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/f1d0c226

Branch: refs/heads/windsor-merge-121
Commit: f1d0c2266bf2a370765e734a1ba4aef1527f2dbc
Parents: 600d20d
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Oct 17 11:45:08 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 17:20:18 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 56 ++++++++++++++++++++++++++++------------
 1 file changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/f1d0c226/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 19824bc..479c32b 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -189,10 +189,24 @@ handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
     fabric_view:handle_worker_exit(State, Worker, Reason);
 
+% Temporary upgrade clause - Case 24236
+handle_message({complete, Key}, Worker, State) when is_tuple(Key) ->
+    handle_message({complete, [{seq, Key}]}, Worker, State);
+
 handle_message(_, _, #collector{limit=0} = State) ->
     {stop, State};
 
-handle_message(#change{key=Key} = Row0, {Worker, From}, St) ->
+handle_message(#change{} = Row, {Worker, From}, St) ->
+    Change = {change, [
+        {seq, Row#change.key},
+        {id, Row#change.id},
+        {changes, Row#change.value},
+        {deleted, Row#change.deleted},
+        {doc, Row#change.doc}
+    ]},
+    handle_message(Change, {Worker, From}, St);
+
+handle_message({change, Props}, {Worker, From}, St) ->
     #collector{
         query_args = #changes_args{include_docs=IncludeDocs},
         callback = Callback,
@@ -201,15 +215,15 @@ handle_message(#change{key=Key} = Row0, {Worker, From}, St) ->
         user_acc = AccIn
     } = St,
     true = fabric_dict:is_key(Worker, S0),
-    S1 = fabric_dict:store(Worker, Key, S0),
+    S1 = fabric_dict:store(Worker, couch_util:get_value(seq, Props), S0),
     % Temporary hack for FB 23637
     Interval = erlang:get(changes_seq_interval),
     if (Interval == undefined) orelse (Limit rem Interval == 0) ->
-        Row = Row0#change{key = pack_seqs(S1)};
+        Props2 = lists:keyreplace(seq, 1, Props, {seq, pack_seqs(S1)});
     true ->
-        Row = Row0#change{key = null}
+        Props2 = lists:keyreplace(seq, 1, Props, {seq, null})
     end,
-    {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
+    {Go, Acc} = Callback(changes_row(Props2, IncludeDocs), AccIn),
     rexi:stream_ack(From),
     {Go, St#collector{counters=S1, limit=Limit-1, user_acc=Acc}};
 
@@ -220,7 +234,8 @@ handle_message({no_pass, Seq}, {Worker, From}, St) ->
     rexi:stream_ack(From),
     {ok, St#collector{counters=S1}};
 
-handle_message({complete, Key}, Worker, State) ->
+handle_message({complete, Props}, Worker, State) ->
+    Key = couch_util:get_value(seq, Props),
     #collector{
         counters = S0,
         total_rows = Completed % override
@@ -350,16 +365,25 @@ do_unpack_seqs(Opaque, DbName) ->
             Unpacked ++ [{R, 0} || R <- Replacements]
     end.
 
-changes_row(#change{key=Seq, id=Id, value=Value, deleted=true, doc=Doc}, true) ->
-    {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, Doc}]}};
-changes_row(#change{key=Seq, id=Id, value=Value, deleted=true}, false) ->
-    {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}};
-changes_row(#change{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) ->
-    {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}};
-changes_row(#change{key=Seq, id=Id, value=Value, doc=Doc}, true) ->
-    {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}};
-changes_row(#change{key=Seq, id=Id, value=Value}, false) ->
-    {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}.
+changes_row(Props0, IncludeDocs) ->
+    Props1 = case {IncludeDocs, couch_util:get_value(doc, Props0)} of
+        {true, {error, Reason}} ->
+            % Transform {doc, {error, Reason}} to {error, Reason} for JSON
+            lists:keyreplace(doc, 1, Props0, {error, Reason});
+        {false, _} ->
+            lists:keydelete(doc, 1, Props0);
+        _ ->
+            Props0
+    end,
+    Props2 = case couch_util:get_value(deleted, Props1) of
+        true ->
+            Props1;
+        _ ->
+            lists:keydelete(deleted, 1, Props1)
+    end,
+    Allowed = [seq, id, changes, deleted, doc],
+    Props3 = lists:filter(fun({K,_V}) -> lists:member(K, Allowed) end, Props0),
+    {change, {Props3}}.
 
 find_replacement_shards(#shard{range=Range}, AllShards) ->
     % TODO make this moar betta -- we might have split or merged the partition


[41/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Provide replacement shards in reduce views

BugzId: 20423


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/238bbdc1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/238bbdc1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/238bbdc1

Branch: refs/heads/windsor-merge-121
Commit: 238bbdc18e69f28b5246328fdc63e736a1d60b46
Parents: 5d9a0e5
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Sep 12 10:22:16 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 17:14:40 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_reduce.erl | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/238bbdc1/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 16d6fb1..67c36a3 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -25,13 +25,16 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -
 
 go(DbName, DDoc, VName, Args, Callback, Acc, {red, {_, Lang, _}, _}=VInfo) ->
     RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
-    Workers0 = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
-        Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName,Args]}),
-        Shard#shard{ref = Ref}
-    end, fabric_view:get_shards(DbName, Args)),
+    RPCArgs = [DDoc, VName, Args],
+    Shards = fabric_view:get_shards(DbName, Args),
+    Repls = fabric_view:get_shard_replacements(DbName, Shards),
+    StartFun = fun(Shard) ->
+        hd(fabric_util:submit_jobs([Shard], fabric_rpc, reduce_view, RPCArgs))
+    end,
+    Workers0 = fabric_util:submit_jobs(Shards,fabric_rpc,reduce_view,RPCArgs),
     RexiMon = fabric_util:create_monitors(Workers0),
     try
-        case fabric_util:stream_start(Workers0, #shard.ref) of
+        case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
             {ok, Workers} ->
                 try
                     go2(DbName, Workers, Lang, RedSrc, Args, Callback, Acc)


[48/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Handle fabric upgrade

We still need to handle direct document passing for older
clients. The new Id+Rev protocol will be a part of a
future upgrade.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/c5fbac59
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/c5fbac59
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/c5fbac59

Branch: refs/heads/windsor-merge-121
Commit: c5fbac597e83c952039b0a3e7649dd5d5b684d9b
Parents: 1c2f572
Author: Brian Mitchell <br...@p2p.io>
Authored: Thu Oct 24 10:12:58 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 20:11:02 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl         | 4 ++++
 src/fabric_view_map.erl    | 2 +-
 src/fabric_view_reduce.erl | 2 +-
 3 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/c5fbac59/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index cee30f2..6f8e810 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -69,6 +69,8 @@ map_view(DbName, DDocInfo, ViewName, Args0) ->
 
 map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
     {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev),
+    map_view(DbName, DDoc, ViewName, Args0, DbOptions);
+map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     set_io_priority(DbName, DbOptions),
     Args = fix_skip_and_limit(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
@@ -81,6 +83,8 @@ reduce_view(DbName, DDocInfo, ViewName, Args0) ->
 
 reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
     {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev),
+    reduce_view(DbName, DDoc, ViewName, Args0, DbOptions);
+reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     set_io_priority(DbName, DbOptions),
     Args = fix_skip_and_limit(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/c5fbac59/src/fabric_view_map.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index 1201daf..d2c010a 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -26,7 +26,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
 go(DbName, DDoc, View, Args, Callback, Acc) ->
     Shards = fabric_view:get_shards(DbName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
-    RPCArgs = [fabric_util:doc_id_and_rev(DDoc), View, Args],
+    RPCArgs = [DDoc, View, Args],
     StartFun = fun(Shard) ->
         hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
     end,

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/c5fbac59/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 2e0d1f2..67c36a3 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -25,7 +25,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -
 
 go(DbName, DDoc, VName, Args, Callback, Acc, {red, {_, Lang, _}, _}=VInfo) ->
     RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
-    RPCArgs = [fabric_util:doc_id_and_rev(DDoc), VName, Args],
+    RPCArgs = [DDoc, VName, Args],
     Shards = fabric_view:get_shards(DbName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
     StartFun = fun(Shard) ->


[06/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Fix fencepost error


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/1f918692
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/1f918692
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/1f918692

Branch: refs/heads/windsor-merge-121
Commit: 1f918692c01521ef7d2fffcaa338004b0c80d63c
Parents: 6120920
Author: Robert Newson <ro...@cloudant.com>
Authored: Sat Aug 10 19:02:50 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 10:52:01 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1f918692/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 870ac57..81b747b 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -389,7 +389,7 @@ owner(Node, Seq, Epochs) ->
 owner(_Node, _Seq, [], _HighSeq) ->
     false;
 owner(Node, Seq, [{EpochNode, EpochSeq} | _Rest], HighSeq)
-  when Node =:= EpochNode andalso Seq < HighSeq andalso Seq >= EpochSeq ->
+  when Node =:= EpochNode andalso Seq < HighSeq andalso Seq > EpochSeq ->
     true;
 owner(Node, Seq, [{_EpochNode, EpochSeq} | Rest], _HighSeq) ->
     owner(Node, Seq, Rest, EpochSeq).


[11/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Revert "Merge pull request #99 from cloudant/fix-fencepost"

This reverts commit 1cf3128d139ebd4e1d6055cdcfbdede43febe1be, reversing
changes made to 82223b0e44c83fb60e93f7653a3bebd22ea98bfe.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/d4620448
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/d4620448
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/d4620448

Branch: refs/heads/windsor-merge-121
Commit: d4620448cc0ae410614f9a5adf54426b2d9554db
Parents: a698d48
Author: Robert Newson <ro...@cloudant.com>
Authored: Wed Aug 14 17:28:11 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 10:54:50 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 49 +++++++++++++++++--------------------------------
 1 file changed, 17 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/d4620448/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index bc0486a..870ac57 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -373,7 +373,7 @@ calculate_start_seq(_Db, Seq) when is_integer(Seq) ->
 calculate_start_seq(Db, {Seq, Uuid, Node}) ->
     case couch_db:get_uuid(Db) == Uuid of
         true ->
-            case is_owner(Node, Seq, couch_db:get_epochs(Db)) of
+            case owner(Node, Seq, couch_db:get_epochs(Db)) of
                 true -> Seq;
                 false -> 0
             end;
@@ -383,28 +383,16 @@ calculate_start_seq(Db, {Seq, Uuid, Node}) ->
             0
     end.
 
-is_owner(Node, Seq, Epochs) ->
-    validate_epochs(Epochs),
-    Node =:= owner_of(Seq, Epochs).
-
-owner_of(_Seq, []) ->
-    undefined;
-owner_of(Seq, [{EpochNode, EpochSeq} | _Rest]) when Seq > EpochSeq ->
-    EpochNode;
-owner_of(Seq, [_ | Rest]) ->
-    owner_of(Seq, Rest).
-
-validate_epochs(Epochs) ->
-    %% Assert uniqueness.
-    case length(Epochs) == length(lists:ukeysort(2, Epochs)) of
-        true  -> ok;
-        false -> erlang:error(duplicate_epoch)
-    end,
-    %% Assert order.
-    case Epochs == lists:sort(fun({_, A}, {_, B}) -> B =< A end, Epochs) of
-        true  -> ok;
-        false -> erlang:error(epoch_order)
-    end.
+owner(Node, Seq, Epochs) ->
+    owner(Node, Seq, Epochs, infinity).
+
+owner(_Node, _Seq, [], _HighSeq) ->
+    false;
+owner(Node, Seq, [{EpochNode, EpochSeq} | _Rest], HighSeq)
+  when Node =:= EpochNode andalso Seq < HighSeq andalso Seq >= EpochSeq ->
+    true;
+owner(Node, Seq, [{_EpochNode, EpochSeq} | Rest], _HighSeq) ->
+    owner(Node, Seq, Rest, EpochSeq).
 
 -ifdef(TEST).
 -include_lib("eunit/include/eunit.hrl").
@@ -423,14 +411,11 @@ calculate_start_seq_test() ->
     %% return integer if we didn't get a vector.
     ?assertEqual(4, calculate_start_seq(#db{}, 4)).
 
-is_owner_test() ->
-    ?assertNot(is_owner(foo, 1, [])),
-    ?assertNot(is_owner(foo, 1, [{foo, 1}])),
-    ?assert(is_owner(foo, 2, [{foo, 1}])),
-    ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])),
-    ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])),
-    ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])),
-    ?assertError(duplicate_epoch, is_owner(foo, 1, [{foo, 1}, {bar, 1}])),
-    ?assertError(epoch_order, is_owner(foo, 1, [{foo, 100}, {bar, 200}])).
+owner_test() ->
+    ?assertNot(owner(foo, 1, [])),
+    ?assert(owner(foo, 1, [{foo, 1}])),
+    ?assert(owner(foo, 50, [{bar, 100}, {foo, 1}])),
+    ?assert(owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])),
+    ?assert(owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])).
 
 -endif.


[46/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Fix inclusion of "doc":"undefined" in changes feed

We were reusing the wrong variable after removing the default values.

BugzId: 24356


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/eec85af5
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/eec85af5
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/eec85af5

Branch: refs/heads/windsor-merge-121
Commit: eec85af5f125526fe07032ad5b01c2ea8b93781b
Parents: 209a200
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Oct 22 00:02:46 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 17:22:52 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/eec85af5/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 479c32b..3ca60b1 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -382,7 +382,7 @@ changes_row(Props0, IncludeDocs) ->
             lists:keydelete(deleted, 1, Props1)
     end,
     Allowed = [seq, id, changes, deleted, doc],
-    Props3 = lists:filter(fun({K,_V}) -> lists:member(K, Allowed) end, Props0),
+    Props3 = lists:filter(fun({K,_V}) -> lists:member(K, Allowed) end, Props2),
     {change, {Props3}}.
 
 find_replacement_shards(#shard{range=Range}, AllShards) ->


[22/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Allow to omit seq from some rows in _changes response

This is a hack to improve the throughput of _changes on databases with
large Q values.  The record-based approach for sharing config options is
a pain to extend so we're going to be dirty and use the pdict here.

BugzID: 23637


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/687bd9d4
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/687bd9d4
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/687bd9d4

Branch: refs/heads/windsor-merge-121
Commit: 687bd9d48b2a688b68cc2a910e94e41be0a915bf
Parents: f38bc21
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Tue Oct 1 21:41:52 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 11:19:02 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/687bd9d4/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index dff9d4a..8d162ce 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -202,7 +202,13 @@ handle_message(#change{key=Key} = Row0, {Worker, From}, St) ->
         % created from DOWN messages would have led to errors
         case fabric_view:is_progress_possible(S2) of
         true ->
-            Row = Row0#change{key = pack_seqs(S2)},
+            % Temporary hack for FB 23637
+            Interval = erlang:get(changes_seq_interval),
+            if (Interval == undefined) orelse (Limit rem Interval == 0) ->
+                Row = Row0#change{key = pack_seqs(S2)};
+            true ->
+                Row = Row0#change{key = null}
+            end,
             {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
             gen_server:reply(From, Go),
             {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}};


[42/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Provide replacement shards in changes feeds

BugzId: 20423


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/600d20d3
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/600d20d3
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/600d20d3

Branch: refs/heads/windsor-merge-121
Commit: 600d20d39d130e8eb35037d9da56052f4fc12a9e
Parents: 238bbdc
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Sep 12 12:25:33 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 17:15:32 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/600d20d3/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 69d9d09..19824bc 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -131,14 +131,22 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
         end
     end, unpack_seqs(PackedSeqs, DbName)),
     {Workers0, _} = lists:unzip(Seqs),
+    Repls = fabric_view:get_shard_replacements(DbName, Workers0),
+    StartFun = fun(#shard{name=Name, node=N}=Shard) ->
+        Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, 0]}),
+        Shard#shard{ref = Ref}
+    end,
     RexiMon = fabric_util:create_monitors(Workers0),
     try
-        case fabric_util:stream_start(Workers0, #shard.ref) of
+        case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
             {ok, Workers} ->
                 try
-                    LiveSeqs = lists:filter(fun({W, _S}) ->
-                        lists:member(W, Workers)
-                    end, Seqs),
+                    LiveSeqs = lists:map(fun(W) ->
+                        case lists:keyfind(W, 1, Seqs) of
+                            {W, Seq} -> {W, Seq};
+                            _ -> {W, 0}
+                        end
+                    end, Workers),
                     send_changes(DbName, Workers, LiveSeqs, ChangesArgs,
                             Callback, AccIn, Timeout)
                 after


[21/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Handle duplicate docs in bulk update

Previous to this commit, fabric assumed that all docs in a bulk update
request were unique. In the case that they were not unique, fabric would
error and return a stack trace to the user. This commit makes fabric
identify docs in a bulk update by a ref rather than their id.

This commit is based on work done by Bob Dionne.

BugzID: 12540


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/f38bc21a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/f38bc21a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/f38bc21a

Branch: refs/heads/windsor-merge-121
Commit: f38bc21ac2ef05fb12d807f5cd67218851c0c04a
Parents: 2f1db98
Author: Benjamin Bastian <be...@gmail.com>
Authored: Fri Aug 23 11:41:48 2013 -0700
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 11:14:14 2014 +0100

----------------------------------------------------------------------
 src/fabric_doc_update.erl | 23 ++++++++++++++++++-----
 1 file changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/f38bc21a/src/fabric_doc_update.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl
index ddae65d..9e2ce50 100644
--- a/src/fabric_doc_update.erl
+++ b/src/fabric_doc_update.erl
@@ -20,18 +20,20 @@
 
 go(_, [], _) ->
     {ok, []};
-go(DbName, AllDocs, Opts) ->
+go(DbName, AllDocs0, Opts) ->
+    AllDocs = tag_docs(AllDocs0),
     validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)),
     Options = lists:delete(all_or_nothing, Opts),
     GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) ->
-        Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name, Docs, Options]}),
+        Docs1 = untag_docs(Docs),
+        Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name,Docs1,Options]}),
         {Shard#shard{ref=Ref}, Docs}
     end, group_docs_by_shard(DbName, AllDocs)),
     {Workers, _} = lists:unzip(GroupedDocs),
     RexiMon = fabric_util:create_monitors(Workers),
     W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
     Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs,
-        dict:from_list([{Doc,[]} || Doc <- AllDocs])},
+        dict:new()},
     Timeout = fabric_util:request_timeout(),
     try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of
     {ok, {Health, Results}} when Health =:= ok; Health =:= accepted ->
@@ -80,7 +82,9 @@ handle_message({ok, Replies}, Worker, Acc0) ->
             {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}};
         {stop, W, FinalReplies} ->
             {stop, {ok, FinalReplies}}
-        end
+        end;
+    _ ->
+        {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}}
     end;
 handle_message({missing_stub, Stub}, _, _) ->
     throw({missing_stub, Stub});
@@ -91,6 +95,16 @@ handle_message({not_found, no_db_file} = X, Worker, Acc0) ->
 handle_message({bad_request, Msg}, _, _) ->
     throw({bad_request, Msg}).
 
+tag_docs([]) ->
+    [];
+tag_docs([#doc{meta=Meta}=Doc | Rest]) ->
+    [Doc#doc{meta=[{ref, make_ref()} | Meta]} | tag_docs(Rest)].
+
+untag_docs([]) ->
+    [];
+untag_docs([#doc{meta=Meta}=Doc | Rest]) ->
+    [Doc#doc{meta=lists:keydelete(ref, 1, Meta)} | untag_docs(Rest)].
+
 force_reply(Doc, [], {_, W, Acc}) ->
     {error, W, [{Doc, {error, internal_server_error}} | Acc]};
 force_reply(Doc, [FirstReply|_] = Replies, {Health, W, Acc}) ->
@@ -157,7 +171,6 @@ append_update_replies([Doc|Rest], [], Dict0) ->
     % icky, if replicated_changes only errors show up in result
     append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
 append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
-    % TODO what if the same document shows up twice in one update_docs call?
     append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
 
 skip_message({0, _, W, _, DocReplyDict}) ->


[30/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Implement fabric_view:handle_worker_exit/3

This is a simple utility function that we can use to cancel a streaming
response when a worker dies. By the time we get here we know that
progress is no longer possible so we report the error and abort the
stream.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/dae200f1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/dae200f1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/dae200f1

Branch: refs/heads/windsor-merge-121
Commit: dae200f1a7ccb3badcea9bbabf3336f9d5a82311
Parents: e4199a8
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Sep 10 20:58:38 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 13:56:03 2014 +0100

----------------------------------------------------------------------
 src/fabric_view.erl | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/dae200f1/src/fabric_view.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
index be41ce1..690625b 100644
--- a/src/fabric_view.erl
+++ b/src/fabric_view.erl
@@ -14,7 +14,7 @@
 
 -export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
     transform_row/1, keydict/1, extract_view/4, get_shards/2,
-    check_down_shards/2]).
+    check_down_shards/2, handle_worker_exit/3]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -37,6 +37,13 @@ check_down_shards(Collector, BadNode) ->
             {ok, Collector}
     end.
 
+%% @doc Handle a worker that dies during a stream
+-spec handle_worker_exit(#collector{}, #shard{}, any()) -> {error, any()}.
+handle_worker_exit(Collector, _Worker, Reason) ->
+    #collector{callback=Callback, user_acc=Acc} = Collector,
+    {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
+    {error, Resp}.
+
 %% @doc looks for a fully covered keyrange in the list of counters
 -spec is_progress_possible([{#shard{}, term()}]) -> boolean().
 is_progress_possible([]) ->


[36/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Set io_priority properly for all requests

Notably, changes wasn't calling set_io_priority so was not affected by
maintenance_mode.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/a8b9e3fa
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/a8b9e3fa
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/a8b9e3fa

Branch: refs/heads/windsor-merge-121
Commit: a8b9e3faa1af2ca1ec0495553b105868d9e92262
Parents: 5fd7f6b
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Oct 4 16:35:13 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 16:12:59 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/a8b9e3fa/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 7f9b88b..d61da40 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -37,7 +37,7 @@ changes(DbName, Args, StartSeq) ->
 changes(DbName, #changes_args{} = Args, StartSeq, DbOptions) ->
     changes(DbName, [Args], StartSeq, DbOptions);
 changes(DbName, Options, StartVector, DbOptions) ->
-    erlang:put(io_priority, {interactive, DbName}),
+    set_io_priority(DbName, DbOptions),
     #changes_args{dir=Dir} = Args = lists:keyfind(changes_args, 1, Options),
     case get_or_create_db(DbName, DbOptions) of
     {ok, Db} ->
@@ -57,6 +57,7 @@ changes(DbName, Options, StartVector, DbOptions) ->
     end.
 
 all_docs(DbName, Options, #mrargs{keys=undefined} = Args0) ->
+    set_io_priority(DbName, Options),
     Args = fix_skip_and_limit(Args0),
     {ok, Db} = get_or_create_db(DbName, Options),
     VAcc0 = #vacc{db=Db},
@@ -67,6 +68,7 @@ map_view(DbName, DDoc, ViewName, Args0) ->
     map_view(DbName, DDoc, ViewName, Args0, []).
 
 map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
+    set_io_priority(DbName, DbOptions),
     Args = fix_skip_and_limit(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
     VAcc0 = #vacc{db=Db},
@@ -77,6 +79,7 @@ reduce_view(DbName, DDoc, ViewName, Args0) ->
     reduce_view(DbName, DDoc, ViewName, Args0, []).
 
 reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
+    set_io_priority(DbName, DbOptions),
     Args = fix_skip_and_limit(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
     VAcc0 = #vacc{db=Db},


[19/50] fabric commit: updated refs/heads/windsor-merge-121 to 79e6e2f

Posted by rn...@apache.org.
Remove (not that successful) downgrade trick


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/5d007482
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/5d007482
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/5d007482

Branch: refs/heads/windsor-merge-121
Commit: 5d007482b48c331d7f8b82d29ce4f9b36dd465f0
Parents: 60e93b0
Author: Robert Newson <ro...@cloudant.com>
Authored: Tue Sep 17 13:31:13 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Thu Jul 31 11:12:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl          | 9 ++++-----
 src/fabric_view_changes.erl | 2 +-
 2 files changed, 5 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/5d007482/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 7b4473f..2cc31ff 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -277,14 +277,13 @@ changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
         {Go, {Db, Seq, Args, Options}}
     end.
 
-% TODO change to {Seq, uuid(Db)}
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) ->
     Doc = doc_member(Db, DI, Opts),
-    #change{key={Seq, uuid(Db), []}, id=Id, value=Results, doc=Doc, deleted=Del};
+    #change{key={Seq, uuid(Db)}, id=Id, value=Results, doc=Doc, deleted=Del};
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
-    #change{key={Seq, uuid(Db), []}, id=Id, value=Results, deleted=true};
+    #change{key={Seq, uuid(Db)}, id=Id, value=Results, deleted=true};
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
-    #change{key={Seq, uuid(Db), []}, id=Id, value=Results}.
+    #change{key={Seq, uuid(Db)}, id=Id, value=Results}.
 
 doc_member(Shard, DocInfo, Opts) ->
     case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of
@@ -368,7 +367,7 @@ set_io_priority(DbName, Options) ->
 
 calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) ->
     Seq;
-calculate_start_seq(Db, Node, {Seq, Uuid, _}) -> % remove me
+calculate_start_seq(Db, Node, {Seq, Uuid, _}) -> % downgrade clause
     calculate_start_seq(Db, Node, {Seq, Uuid});
 calculate_start_seq(Db, Node, {Seq, Uuid}) ->
     case is_prefix(Uuid, couch_db:get_uuid(Db)) of

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/5d007482/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index a54748a..a6c468e 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -297,7 +297,7 @@ pack_seqs(Workers) ->
     Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])),
     [SeqSum, Opaque].
 
-seq({Seq, _Uuid, _Node}) -> Seq; % remove me
+seq({Seq, _Uuid, _Node}) -> Seq; % downgrade clause
 seq({Seq, _Uuid}) -> Seq;
 seq(Seq)          -> Seq.