You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2014/08/01 16:34:22 UTC

[01/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Repository: couchdb-fabric
Updated Branches:
  refs/heads/windsor-merge 4ec3f1150 -> b1c0030fa (forced update)


Implement fabric_view:handle_worker_exit/3

This is a simple utility function that we can use to cancel a streaming
response when a worker dies. By the time we get here we know that
progress is no longer possible so we report the error and abort the
stream.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/36a2d2d1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/36a2d2d1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/36a2d2d1

Branch: refs/heads/windsor-merge
Commit: 36a2d2d1d1317e3824445f47f57efd1868bbe9cb
Parents: dc1f014
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Sep 10 20:58:38 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:40 2014 +0100

----------------------------------------------------------------------
 src/fabric_view.erl | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/36a2d2d1/src/fabric_view.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
index be41ce1..690625b 100644
--- a/src/fabric_view.erl
+++ b/src/fabric_view.erl
@@ -14,7 +14,7 @@
 
 -export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
     transform_row/1, keydict/1, extract_view/4, get_shards/2,
-    check_down_shards/2]).
+    check_down_shards/2, handle_worker_exit/3]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -37,6 +37,13 @@ check_down_shards(Collector, BadNode) ->
             {ok, Collector}
     end.
 
+%% @doc Handle a worker that dies during a stream
+-spec handle_worker_exit(#collector{}, #shard{}, any()) -> {error, any()}.
+handle_worker_exit(Collector, _Worker, Reason) ->
+    #collector{callback=Callback, user_acc=Acc} = Collector,
+    {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
+    {error, Resp}.
+
 %% @doc looks for a fully covered keyrange in the list of counters
 -spec is_progress_possible([{#shard{}, term()}]) -> boolean().
 is_progress_possible([]) ->


[08/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Replace read_repair logs with a metric

Successful read repair notices aren't overly informative. To reduce log
spam we'll just switch to using a metric and only log when we don't have
a successful response.

BugzId: 24262


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/a505676b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/a505676b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/a505676b

Branch: refs/heads/windsor-merge
Commit: a505676b072e59814f59bdb46a1856d8df79c77b
Parents: 865b555
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Oct 17 15:35:39 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_doc_open.erl      | 7 ++++++-
 src/fabric_doc_open_revs.erl | 7 ++++++-
 2 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/a505676b/src/fabric_doc_open.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_open.erl b/src/fabric_doc_open.erl
index 77b6337..26bd2a3 100644
--- a/src/fabric_doc_open.erl
+++ b/src/fabric_doc_open.erl
@@ -120,7 +120,12 @@ read_repair(#acc{dbname=DbName, replies=Replies}) ->
         Ctx = #user_ctx{roles=[<<"_admin">>]},
         Opts = [replicated_changes, {user_ctx, Ctx}],
         Res = fabric:update_docs(DbName, Docs, Opts),
-        couch_log:notice("read_repair ~s ~s ~p", [DbName, Id, Res]),
+        case Res of
+            {ok, []} ->
+                ok;
+            _ ->
+                couch_log:notice("read_repair ~s ~s ~p", [DbName, Id, Res])
+        end,
         choose_reply(Docs);
     [] ->
         % Try hard to return some sort of information

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/a505676b/src/fabric_doc_open_revs.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_open_revs.erl b/src/fabric_doc_open_revs.erl
index 662990a..31d7616 100644
--- a/src/fabric_doc_open_revs.erl
+++ b/src/fabric_doc_open_revs.erl
@@ -167,7 +167,12 @@ maybe_execute_read_repair(Db, Docs) ->
     [#doc{id=Id} | _] = Docs,
     Ctx = #user_ctx{roles=[<<"_admin">>]},
     Res = fabric:update_docs(Db, Docs, [replicated_changes, {user_ctx,Ctx}]),
-    couch_log:notice("read_repair ~s ~s ~p", [Db, Id, Res]).
+    case Res of
+        {ok, []} ->
+            ok;
+        _ ->
+            couch_log:notice("read_repair ~s ~s ~p", [Db, Id, Res])
+    end.
 
 % hackery required so that not_found sorts first
 strip_not_found_missing([]) ->


[05/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Implement fabric_util:stream_start/2

This is a utility function that handles gathering the start of a rexi
stream. It will return a single full hash ring of shards or an error.

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/bea3052a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/bea3052a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/bea3052a

Branch: refs/heads/windsor-merge
Commit: bea3052ae5dc5faeba0f7d072c9419e14487127c
Parents: 28a9e3d
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Sep 6 07:26:32 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:40 2014 +0100

----------------------------------------------------------------------
 src/fabric_util.erl | 52 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 52 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/bea3052a/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index c93efda..f11abe3 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -16,6 +16,7 @@
         update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
         remove_down_workers/2]).
 -export([request_timeout/0]).
+-export([stream_start/2]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -44,6 +45,57 @@ submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
 cleanup(Workers) ->
     [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
 
+stream_start(Workers0, Keypos) ->
+    Fun = fun handle_stream_start/3,
+    Acc = fabric_dict:init(Workers0, waiting),
+    Timeout = request_timeout(),
+    case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
+        {ok, Workers} ->
+            true = fabric_view:is_progress_possible(Workers),
+            AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
+                rexi:stream_start(From),
+                [Worker | WorkerAcc]
+            end, [], Workers),
+            {ok, AckedWorkers};
+        Else ->
+            Else
+    end.
+
+handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+    case fabric_util:remove_down_workers(State, NodeRef) of
+    {ok, NewState} ->
+        {ok, NewState};
+    error ->
+        Reason = {nodedown, <<"progress not possible">>},
+        {error, Reason}
+    end;
+handle_stream_start({rexi_EXIT, Reason}, Worker, State) ->
+    NewState = fabric_dict:erase(Worker, State),
+    case fabric_view:is_progress_possible(NewState) of
+    true ->
+        {ok, NewState};
+    false ->
+        {error, fabric_util:error_info(Reason)}
+    end;
+handle_stream_start(rexi_STREAM_INIT, {Worker, From}, State) ->
+    case fabric_dict:lookup_element(Worker, State) of
+    undefined ->
+        % This worker lost the race with other partition copies, terminate
+        rexi:stream_cancel(From),
+        {ok, State};
+    waiting ->
+        % Don't ack the worker yet so they don't start sending us
+        % rows until we're ready
+        NewState0 = fabric_dict:store(Worker, From, State),
+        NewState1 = fabric_view:remove_overlapping_shards(Worker, NewState0),
+        case fabric_dict:any(waiting, NewState1) of
+            true -> {ok, NewState1};
+            false -> {stop, NewState1}
+        end
+    end;
+handle_stream_start(Else, _, _) ->
+    exit({invalid_stream_start, Else}).
+
 recv(Workers, Keypos, Fun, Acc0) ->
     rexi_utils:recv(Workers, Keypos, Fun, Acc0, request_timeout(), infinity).
 


[35/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Report number of pending changes in shard

Also refactor the _changes accumulator into a record.

BugzID: 24236


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/9de10969
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/9de10969
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/9de10969

Branch: refs/heads/windsor-merge
Commit: 9de109699509ffb10987c3c190a0b2190efefb94
Parents: 161f088
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Oct 17 14:17:12 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 47 +++++++++++++++++++++++++++++++++--------------
 1 file changed, 33 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/9de10969/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index cbc913e..9b6c217 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -27,6 +27,14 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
+-record (cacc, {
+    db,
+    seq,
+    args,
+    options,
+    pending
+}).
+
 %% rpc endpoints
 %%  call to with_db will supply your M:F with a #db{} and then remaining args
 
@@ -44,11 +52,20 @@ changes(DbName, Options, StartVector, DbOptions) ->
         StartSeq = calculate_start_seq(Db, node(), StartVector),
         Enum = fun changes_enumerator/2,
         Opts = [{dir,Dir}],
-        Acc0 = {Db, StartSeq, Args, Options},
+        Acc0 = #cacc{
+          db = Db,
+          seq = StartSeq,
+          args = Args,
+          options = Options,
+          pending = couch_db:count_changes_since(Db, StartSeq)
+        },
         try
-            {ok, {_, LastSeq, _, _}} =
+            {ok, #cacc{seq=LastSeq, pending=Pending}} =
                 couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0),
-            rexi:reply({complete, [{seq, {LastSeq, uuid(Db)}}]})
+            rexi:reply({complete, [
+                {seq, {LastSeq, uuid(Db)}},
+                {pending, Pending}
+            ]})
         after
             couch_db:close(Db)
         end;
@@ -297,22 +314,24 @@ send(Key, Value, Acc) ->
         end
     end.
 
-changes_enumerator(#doc_info{id= <<"_local/", _/binary>>, high_seq=Seq},
-        {Db, _OldSeq, Args, Options}) ->
-    {ok, {Db, Seq, Args, Options}};
-changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
-    #changes_args{
-        include_docs = IncludeDocs,
-        filter = Acc
-    } = Args,
+changes_enumerator(#doc_info{id= <<"_local/", _/binary>>, high_seq=Seq}, Acc) ->
+    {ok, Acc#cacc{seq = Seq, pending = Acc#cacc.pending-1}};
+changes_enumerator(DocInfo, Acc) ->
+    #cacc{
+        db = Db,
+        args = #changes_args{include_docs = IncludeDocs, filter = Filter},
+        options = Options,
+        pending = Pending
+    } = Acc,
     Conflicts = proplists:get_value(conflicts, Options, false),
     #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del}|_]} = DocInfo,
-    case [X || X <- couch_changes:filter(Db, DocInfo, Acc), X /= null] of
+    case [X || X <- couch_changes:filter(Db, DocInfo, Filter), X /= null] of
     [] ->
-        {ok, {Db, Seq, Args, Options}};
+        {ok, Acc#cacc{seq = Seq, pending = Pending-1}};
     Results ->
         Opts = if Conflicts -> [conflicts]; true -> [] end,
         ChangesRow = {change, [
+	    {pending, Pending-1},
             {seq, {Seq, uuid(Db)}},
             {id, Id},
             {changes, Results},
@@ -320,7 +339,7 @@ changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
             if IncludeDocs -> [doc_member(Db, DocInfo, Opts)]; true -> [] end
         ]},
         Go = rexi:sync_reply(ChangesRow),
-        {Go, {Db, Seq, Args, Options}}
+        {Go, Acc#cacc{seq = Seq, pending = Pending-1}}
     end.
 
 doc_member(Shard, DocInfo, Opts) ->


[13/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Update reduce view coordinator to use new RPC APIs

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/df9cb854
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/df9cb854
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/df9cb854

Branch: refs/heads/windsor-merge
Commit: df9cb854bd4a9adb659572fb54fce091b873318f
Parents: 11112ad
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Sep 4 16:00:01 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_reduce.erl | 80 ++++++++++++++++++++---------------------
 1 file changed, 40 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/df9cb854/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 12bed6d..42b889b 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -19,11 +19,35 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
-go(DbName, DDoc, VName, Args, Callback, Acc0, {red, {_, Lang, _}, _}=VInfo) ->
-    Shards = fabric_view:get_shards(DbName, Args),
-    Workers = fabric_util:submit_jobs(Shards, reduce_view, [DDoc, VName, Args]),
+go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) ->
+    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
+    go(DbName, DDoc, View, Args, Callback, Acc0, VInfo);
+
+go(DbName, DDoc, VName, Args, Callback, Acc, {red, {_, Lang, _}, _}=VInfo) ->
     RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
-    RexiMon = fabric_util:create_monitors(Workers),
+    Workers0 = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
+        Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName,Args]}),
+        Shard#shard{ref = Ref}
+    end, fabric_view:get_shards(DbName, Args)),
+    RexiMon = fabric_util:create_monitors(Workers0),
+    try
+        case fabric_util:stream_start(Workers0, #shard.ref) of
+            {ok, Workers} ->
+                try
+                    go2(DbName, Workers, Lang, RedSrc, Args, Callback, Acc)
+                after
+                    fabric_util:cleanup(Workers)
+                end;
+            {timeout, _} ->
+                Callback({error, timeout}, Acc);
+            {error, Error} ->
+                Callback({error, Error}, Acc)
+        end
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+go2(DbName, Workers, Lang, RedSrc, Args, Callback, Acc0) ->
     #mrargs{limit = Limit, skip = Skip, keys = Keys} = Args,
     OsProc = case os_proc_needed(RedSrc) of
         true -> couch_query_servers:get_os_process(Lang);
@@ -52,27 +76,16 @@ go(DbName, DDoc, VName, Args, Callback, Acc0, {red, {_, Lang, _}, _}=VInfo) ->
     {error, Resp} ->
         {ok, Resp}
     after
-        rexi_monitor:stop(RexiMon),
-        fabric_util:cleanup(Workers),
-        case State#collector.os_proc of
-            nil -> ok;
-            OsProc -> catch couch_query_servers:ret_os_process(OsProc)
+        if OsProc == nil -> ok; true ->
+            catch couch_query_servers:ret_os_process(OsProc)
         end
     end.
 
 handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    fabric_view:remove_down_shards(State, NodeRef);
+    fabric_view:check_down_shards(State, NodeRef);
 
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
-    #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
-    Counters = fabric_dict:erase(Worker, Counters0),
-    case fabric_view:is_progress_possible(Counters) of
-    true ->
-        {ok, State#collector{counters = Counters}};
-    false ->
-        {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-        {error, Resp}
-    end;
+    fabric_view:handle_worker_exit(State, Worker, Reason);
 
 %% HACK: this just sends meta once. Instead we should move the counter logic
 %% from the #view_row handle_message below into this function and and pass the
@@ -100,29 +113,16 @@ handle_message({meta, Meta}, {_Worker, From}, State) ->
 
 handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
     #collector{counters = Counters0, rows = Rows0} = State,
-    case fabric_dict:lookup_element(Worker, Counters0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate it
-        gen_server:reply(From, stop),
-        {ok, State};
-    _ ->
-        Rows = dict:append(Key, Row#view_row{worker={Worker, From}}, Rows0),
-        C1 = fabric_dict:update_counter(Worker, 1, Counters0),
-        % TODO time this call, if slow don't do it every time
-        C2 = fabric_view:remove_overlapping_shards(Worker, C1),
-        State1 = State#collector{rows=Rows, counters=C2},
-        fabric_view:maybe_send_row(State1)
-    end;
+    true = fabric_dict:is_key(Worker, Counters0),
+    Rows = dict:append(Key, Row#view_row{worker={Worker, From}}, Rows0),
+    C1 = fabric_dict:update_counter(Worker, 1, Counters0),
+    State1 = State#collector{rows=Rows, counters=C1},
+    fabric_view:maybe_send_row(State1);
 
 handle_message(complete, Worker, #collector{counters = Counters0} = State) ->
-    case fabric_dict:lookup_element(Worker, Counters0) of
-    undefined ->
-        {ok, State};
-    _ ->
-        C1 = fabric_dict:update_counter(Worker, 1, Counters0),
-        C2 = fabric_view:remove_overlapping_shards(Worker, C1),
-        fabric_view:maybe_send_row(State#collector{counters = C2})
-    end.
+    true = fabric_dict:is_key(Worker, Counters0),
+    C1 = fabric_dict:update_counter(Worker, 1, Counters0),
+    fabric_view:maybe_send_row(State#collector{counters = C1}).
 
 complete_worker_test() ->
     meck:new(config),


[49/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Fix logging of errors in _all_docs open_doc calls

Our exception handling accidentally captured the successful return
value. Using try/of/catch we can exit from the of clause without
catching.

BugzId: 26162


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/a01d2395
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/a01d2395
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/a01d2395

Branch: refs/heads/windsor-merge
Commit: a01d2395fd6970c7802b668898aa7eb1c3ea73a4
Parents: 1fced48
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Dec 13 13:56:44 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_all_docs.erl | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/a01d2395/src/fabric_view_all_docs.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl
index 4403146..d4dec09 100644
--- a/src/fabric_view_all_docs.erl
+++ b/src/fabric_view_all_docs.erl
@@ -206,8 +206,9 @@ doc_receive_loop(Keys, Pids, SpawnFun, MaxJobs, Callback, AccIn) ->
 
 
 open_doc(DbName, Options, Id, IncludeDocs) ->
-    try
-        open_doc_int(DbName, Options, Id, IncludeDocs)
+    try open_doc_int(DbName, Options, Id, IncludeDocs) of
+        #view_row{} = Row ->
+            exit(Row)
     catch Type:Reason ->
         Stack = erlang:get_stacktrace(),
         couch_log:error("_all_docs open error: ~s ~s :: ~w ~w", [
@@ -231,7 +232,7 @@ open_doc_int(DbName, Options, Id, IncludeDocs) ->
         Value = {[{rev,couch_doc:rev_to_str({RevPos, RevId})}]},
         #view_row{key=Id, id=Id, value=Value}
     end,
-    exit(if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end).
+    if IncludeDocs -> Row#view_row{doc=Doc}; true -> Row end.
 
 cancel_read_pids(Pids) ->
     case queue:out(Pids) of


[03/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Use the dollar_endonly option to prevent newlines

BugzID: 24203


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/503fc0f4
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/503fc0f4
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/503fc0f4

Branch: refs/heads/windsor-merge
Commit: 503fc0f4d058d9a648d32cad4f3b657f79b1b78e
Parents: 455afa7
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Wed Oct 16 13:56:35 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:40 2014 +0100

----------------------------------------------------------------------
 src/fabric_db_create.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/503fc0f4/src/fabric_db_create.erl
----------------------------------------------------------------------
diff --git a/src/fabric_db_create.erl b/src/fabric_db_create.erl
index c8f2d45..5a8334f 100644
--- a/src/fabric_db_create.erl
+++ b/src/fabric_db_create.erl
@@ -42,7 +42,7 @@ validate_dbname(DbName, Options) ->
     false ->
         ok;
     true ->
-        case re:run(DbName, ?DBNAME_REGEX, [{capture,none}]) of
+        case re:run(DbName, ?DBNAME_REGEX, [{capture,none}, dollar_endonly]) of
         match ->
             ok;
         nomatch when DbName =:= <<"_users">> ->


[26/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Pass document id & rev rather than the whole document

This uses the ddoc_cache support for revisions. It should
reduce bandwidth overhead of copying documents around.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/df0c3a33
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/df0c3a33
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/df0c3a33

Branch: refs/heads/windsor-merge
Commit: df0c3a338d256e77420157c2459c763233389de4
Parents: 1f04561
Author: Brian Mitchell <br...@cloudant.com>
Authored: Mon Oct 21 13:14:40 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl         | 14 ++++++++------
 src/fabric_util.erl        |  7 ++++++-
 src/fabric_view_map.erl    |  2 +-
 src/fabric_view_reduce.erl |  2 +-
 4 files changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/df0c3a33/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index d61da40..cee30f2 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -64,10 +64,11 @@ all_docs(DbName, Options, #mrargs{keys=undefined} = Args0) ->
     couch_mrview:query_all_docs(Db, Args, fun view_cb/2, VAcc0).
 
 %% @equiv map_view(DbName, DDoc, ViewName, Args0, [])
-map_view(DbName, DDoc, ViewName, Args0) ->
-    map_view(DbName, DDoc, ViewName, Args0, []).
+map_view(DbName, DDocInfo, ViewName, Args0) ->
+    map_view(DbName, DDocInfo, ViewName, Args0, []).
 
-map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
+map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
+    {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev),
     set_io_priority(DbName, DbOptions),
     Args = fix_skip_and_limit(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
@@ -75,10 +76,11 @@ map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0).
 
 %% @equiv reduce_view(DbName, DDoc, ViewName, Args0)
-reduce_view(DbName, DDoc, ViewName, Args0) ->
-    reduce_view(DbName, DDoc, ViewName, Args0, []).
+reduce_view(DbName, DDocInfo, ViewName, Args0) ->
+    reduce_view(DbName, DDocInfo, ViewName, Args0, []).
 
-reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
+reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
+    {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev),
     set_io_priority(DbName, DbOptions),
     Args = fix_skip_and_limit(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/df0c3a33/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index 344ead7..642d7d6 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -14,10 +14,12 @@
 
 -export([submit_jobs/3, submit_jobs/4, cleanup/1, recv/4, get_db/1, get_db/2, error_info/1,
         update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
-        remove_down_workers/2]).
+        remove_down_workers/2, doc_id_and_rev/1]).
 -export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0]).
 -export([stream_start/2, stream_start/4]).
 
+-compile({inline, [{doc_id_and_rev,1}]}).
+
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
@@ -271,3 +273,6 @@ remove_ancestors_test() ->
 %% test function
 kv(Item, Count) ->
     {make_key(Item), {Item,Count}}.
+
+doc_id_and_rev(#doc{id=DocId, revs={RevNum, [RevHash|_]}}) ->
+    {DocId, {RevNum, RevHash}}.

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/df0c3a33/src/fabric_view_map.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index d2c010a..1201daf 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -26,7 +26,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
 go(DbName, DDoc, View, Args, Callback, Acc) ->
     Shards = fabric_view:get_shards(DbName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
-    RPCArgs = [DDoc, View, Args],
+    RPCArgs = [fabric_util:doc_id_and_rev(DDoc), View, Args],
     StartFun = fun(Shard) ->
         hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
     end,

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/df0c3a33/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 67c36a3..2e0d1f2 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -25,7 +25,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -
 
 go(DbName, DDoc, VName, Args, Callback, Acc, {red, {_, Lang, _}, _}=VInfo) ->
     RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
-    RPCArgs = [DDoc, VName, Args],
+    RPCArgs = [fabric_util:doc_id_and_rev(DDoc), VName, Args],
     Shards = fabric_view:get_shards(DbName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
     StartFun = fun(Shard) ->


[02/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Add fabric_dict:is_key/2

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/32225113
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/32225113
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/32225113

Branch: refs/heads/windsor-merge
Commit: 322251130034319500b08a382c02a087be83cbc6
Parents: 503fc0f
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Sep 4 15:58:42 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:40 2014 +0100

----------------------------------------------------------------------
 src/fabric_dict.erl | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/32225113/src/fabric_dict.erl
----------------------------------------------------------------------
diff --git a/src/fabric_dict.erl b/src/fabric_dict.erl
index a9d7fea..20700c0 100644
--- a/src/fabric_dict.erl
+++ b/src/fabric_dict.erl
@@ -19,6 +19,8 @@
 init(Keys, InitialValue) ->
     orddict:from_list([{Key, InitialValue} || Key <- Keys]).
 
+is_key(Key, Dict) ->
+    orddict:is_key(Key, Dict).
 
 decrement_all(Dict) ->
     [{K,V-1} || {K,V} <- Dict].


[14/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Update map view coordinator to use new RPC APIs

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/11112ad1
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/11112ad1
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/11112ad1

Branch: refs/heads/windsor-merge
Commit: 11112ad1655426fd5689a6172866631e18a62eea
Parents: 216ffb3
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Sep 6 07:26:52 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_map.erl | 100 +++++++++++++++++++++++--------------------
 1 file changed, 54 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/11112ad1/src/fabric_view_map.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index eb30179..cf70568 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -23,9 +23,29 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
     {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
     go(DbName, DDoc, View, Args, Callback, Acc0);
 
-go(DbName, DDoc, View, Args, Callback, Acc0) ->
+go(DbName, DDoc, View, Args, Callback, Acc) ->
     Shards = fabric_view:get_shards(DbName, Args),
-    Workers = fabric_util:submit_jobs(Shards, map_view, [DDoc, View, Args]),
+    Workers0 = fabric_util:submit_jobs(
+            Shards, fabric_rpc, map_view, [DDoc, View, Args]),
+    RexiMon = fabric_util:create_monitors(Workers0),
+    try
+        case fabric_util:stream_start(Workers0, #shard.ref) of
+            {ok, Workers} ->
+                try
+                    go(DbName, Workers, Args, Callback, Acc)
+                after
+                    fabric_util:cleanup(Workers)
+                end;
+            {timeout, _} ->
+                Callback({error, timeout}, Acc);
+            {error, Error} ->
+                Callback({error, Error}, Acc)
+        end
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+go(DbName, Workers, Args, Callback, Acc0) ->
     #mrargs{limit = Limit, skip = Skip, keys = Keys} = Args,
     State = #collector{
         db_name=DbName,
@@ -38,8 +58,7 @@ go(DbName, DDoc, View, Args, Callback, Acc0) ->
         sorted = Args#mrargs.sorted,
         user_acc = Acc0
     },
-    RexiMon = fabric_util:create_monitors(Workers),
-    try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
+    case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
         State, infinity, 1000 * 60 * 60) of
     {ok, NewState} ->
         {ok, NewState#collector.user_acc};
@@ -47,24 +66,18 @@ go(DbName, DDoc, View, Args, Callback, Acc0) ->
         Callback({error, timeout}, NewState#collector.user_acc);
     {error, Resp} ->
         {ok, Resp}
-    after
-        rexi_monitor:stop(RexiMon),
-        fabric_util:cleanup(Workers)
     end.
 
 handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    fabric_view:remove_down_shards(State, NodeRef);
+    fabric_view:check_down_shards(State, NodeRef);
 
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
-    #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
-    Counters = fabric_dict:erase(Worker, Counters0),
-    case fabric_view:is_progress_possible(Counters) of
-    true ->
-        {ok, State#collector{counters = Counters}};
-    false ->
-        {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-        {error, Resp}
-    end;
+    fabric_view:handle_worker_exit(State, Worker, Reason);
+
+handle_message({rexi_EXIT, Reason}, _, State) ->
+    #collector{callback=Callback, user_acc=Acc} = State,
+    {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
+    {error, Resp};
 
 handle_message({meta, Meta0}, {Worker, From}, State) ->
     Tot = couch_util:get_value(total, Meta0, 0),
@@ -76,35 +89,30 @@ handle_message({meta, Meta0}, {Worker, From}, State) ->
         offset = Offset0,
         user_acc = AccIn
     } = State,
-    case fabric_dict:lookup_element(Worker, Counters0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate
-        gen_server:reply(From, stop),
-        {ok, State};
-    0 ->
-        gen_server:reply(From, ok),
-        Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
-        Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
-        Total = Total0 + Tot,
-        Offset = Offset0 + Off,
-        case fabric_dict:any(0, Counters2) of
-        true ->
-            {ok, State#collector{
-                counters = Counters2,
-                total_rows = Total,
-                offset = Offset
-            }};
-        false ->
-            FinalOffset = erlang:min(Total, Offset+State#collector.skip),
-            Meta = [{total, Total}, {offset, FinalOffset}],
-            {Go, Acc} = Callback({meta, Meta}, AccIn),
-            {Go, State#collector{
-                counters = fabric_dict:decrement_all(Counters2),
-                total_rows = Total,
-                offset = FinalOffset,
-                user_acc = Acc
-            }}
-        end
+    % Assert that we don't have other messages from this
+    % worker when the total_and_offset message arrives.
+    0 = fabric_dict:lookup_element(Worker, Counters0),
+    rexi:stream_ack(From),
+    Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+    Total = Total0 + Tot,
+    Offset = Offset0 + Off,
+    case fabric_dict:any(0, Counters1) of
+    true ->
+        {ok, State#collector{
+            counters = Counters1,
+            total_rows = Total,
+            offset = Offset
+        }};
+    false ->
+        FinalOffset = erlang:min(Total, Offset+State#collector.skip),
+        Meta = [{total, Total}, {offset, FinalOffset}],
+        {Go, Acc} = Callback({meta, Meta}, AccIn),
+        {Go, State#collector{
+            counters = fabric_dict:decrement_all(Counters1),
+            total_rows = Total,
+            offset = FinalOffset,
+            user_acc = Acc
+        }}
     end;
 
 handle_message(#view_row{}, {_, _}, #collector{limit=0} = State) ->


[11/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Provide replacement shards in changes feeds

BugzId: 20423


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/cb388f29
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/cb388f29
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/cb388f29

Branch: refs/heads/windsor-merge
Commit: cb388f29efcd1597f5c7ad35a361055f4bb91651
Parents: 18f6c8e
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Sep 12 12:25:33 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/cb388f29/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 69d9d09..19824bc 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -131,14 +131,22 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
         end
     end, unpack_seqs(PackedSeqs, DbName)),
     {Workers0, _} = lists:unzip(Seqs),
+    Repls = fabric_view:get_shard_replacements(DbName, Workers0),
+    StartFun = fun(#shard{name=Name, node=N}=Shard) ->
+        Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, 0]}),
+        Shard#shard{ref = Ref}
+    end,
     RexiMon = fabric_util:create_monitors(Workers0),
     try
-        case fabric_util:stream_start(Workers0, #shard.ref) of
+        case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
             {ok, Workers} ->
                 try
-                    LiveSeqs = lists:filter(fun({W, _S}) ->
-                        lists:member(W, Workers)
-                    end, Seqs),
+                    LiveSeqs = lists:map(fun(W) ->
+                        case lists:keyfind(W, 1, Seqs) of
+                            {W, Seq} -> {W, Seq};
+                            _ -> {W, 0}
+                        end
+                    end, Workers),
                     send_changes(DbName, Workers, LiveSeqs, ChangesArgs,
                             Callback, AccIn, Timeout)
                 after


[42/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Clear worker references for get_shard_replacements

This function relies on record equality. If a request passes workers
that have references defined it would return all copies of the shard
range incorrectly.

BugzId: 28992


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/992cf38e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/992cf38e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/992cf38e

Branch: refs/heads/windsor-merge
Commit: 992cf38e20c80a32be7ae219576be121f8dd51be
Parents: 63ec0f9
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Mar 12 02:25:35 2014 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_view.erl | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/992cf38e/src/fabric_view.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
index 7cd5321..32369cd 100644
--- a/src/fabric_view.erl
+++ b/src/fabric_view.erl
@@ -306,10 +306,11 @@ get_shards(DbName, #mrargs{stale=Stale})
 get_shards(DbName, #mrargs{stale=false}) ->
     mem3:shards(DbName).
 
-get_shard_replacements(DbName, UsedShards) ->
+get_shard_replacements(DbName, UsedShards0) ->
     % We only want to generate a replacements list from shards
     % that aren't already used.
     AllLiveShards = mem3:live_shards(DbName, [node() | nodes()]),
+    UsedShards = [S#shard{ref=undefined} || S <- UsedShards0],
     UnusedShards = AllLiveShards -- UsedShards,
 
     % If we have more than one copy of a range then we don't


[32/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Use smarter seqs when original shard is down/gone

If a shard that contributes to a sequence a) is on a down node or b) is
no longer part of the shard set we can provide enough detail to the
replacement workers to allow them to choose a smarter replacement
sequence.

This patch does not cover the case where the shard that originally
contributed to the sequence is alive but crashes before initializing the
stream to the coordinator.  In that case we're still firing up
replacement workers and asking them to stream from zero.  The
stream_start semantics make reworking that bit a tad trickier.

BugzID: 22698


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/f4616ee4
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/f4616ee4
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/f4616ee4

Branch: refs/heads/windsor-merge
Commit: f4616ee4ba653da19fe386db789732dc9a53308f
Parents: 5d52436
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Sun Nov 17 18:16:07 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/f4616ee4/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 3ca60b1..4ee87e4 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -125,7 +125,8 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
             % TODO It's possible in rare cases of shard merging to end up
             % with overlapping shard ranges from this technique
             lists:map(fun(#shard{name=Name2, node=N2} = NewShard) ->
-                Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2,ChangesArgs,0]}),
+                Ref = rexi:cast(N2, {fabric_rpc, changes, [Name2, ChangesArgs,
+                    make_replacement_arg(N, Seq)]}),
                 {NewShard#shard{ref = Ref}, 0}
             end, find_replacement_shards(Shard, AllLiveShards))
         end
@@ -255,6 +256,13 @@ handle_message({complete, Props}, Worker, State) ->
     end,
     {Go, NewState}.
 
+make_replacement_arg(Node, {Seq, Uuid}) ->
+    {replace, Node, Uuid, Seq};
+make_replacement_arg(Node, {Seq, Uuid, _}) ->
+    {replace, Node, Uuid, Seq};
+make_replacement_arg(_, _) ->
+    0.
+
 make_changes_args(#changes_args{style=Style, filter=undefined}=Args) ->
     Args#changes_args{filter = Style};
 make_changes_args(Args) ->


[31/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
[2/2] Use proplist instead of #change record


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/5e65ee0a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/5e65ee0a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/5e65ee0a

Branch: refs/heads/windsor-merge
Commit: 5e65ee0a191ad6b61be6d08ca91c36564f37381d
Parents: 16e020f
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Oct 17 13:27:58 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 24 +++++++++++++++++++-----
 1 file changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/5e65ee0a/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index b0ce9b1..c312aa7 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -48,7 +48,7 @@ changes(DbName, Options, StartVector, DbOptions) ->
         try
             {ok, {_, LastSeq, _, _}} =
                 couch_db:changes_since(Db, StartSeq, Enum, Opts, Acc0),
-            rexi:reply({complete, {LastSeq, uuid(Db)}})
+            rexi:reply({complete, [{seq, {LastSeq, uuid(Db)}}]})
         after
             couch_db:close(Db)
         end;
@@ -318,12 +318,26 @@ changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
     end.
 
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) ->
-    Doc = doc_member(Db, DI, Opts),
-    #change{key={Seq, uuid(Db)}, id=Id, value=Results, doc=Doc, deleted=Del};
+    {change, [
+        {seq, {Seq, uuid(Db)}},
+        {id, Id},
+        {changes, Results},
+        {deleted, Del},
+        {doc, doc_member(Db, DI, Opts)}
+    ]};
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
-    #change{key={Seq, uuid(Db)}, id=Id, value=Results, deleted=true};
+    {change, [
+        {seq, {Seq, uuid(Db)}},
+        {id, Id},
+        {changes, Results},
+        {deleted, true}
+    ]};
 changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
-    #change{key={Seq, uuid(Db)}, id=Id, value=Results}.
+    {change, [
+        {seq, {Seq, uuid(Db)}},
+        {id, Id},
+        {changes, Results}
+    ]}.
 
 doc_member(Shard, DocInfo, Opts) ->
     case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of


[40/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Move attachment code into couch_att

This is an attempt to isolate the attachment record and
some related code. This will allow seamless upgrades
over time.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/8cefcd8b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/8cefcd8b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/8cefcd8b

Branch: refs/heads/windsor-merge
Commit: 8cefcd8bc21679e2e15b5f1fee35c34707cf7868
Parents: 2e77350
Author: Brian Mitchell <br...@p2p.io>
Authored: Wed Dec 11 23:11:48 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/8cefcd8b/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 9b6c217..b398969 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -373,7 +373,7 @@ make_att_readers([]) ->
 make_att_readers([#doc{atts=Atts0} = Doc | Rest]) ->
     % % go through the attachments looking for 'follows' in the data,
     % % replace with function that reads the data from MIME stream.
-    Atts = [Att#att{data=make_att_reader(D)} || #att{data=D} = Att <- Atts0],
+    Atts = [couch_att:transform(data, fun make_att_reader/1, Att) || Att <- Atts0],
     [Doc#doc{atts = Atts} | make_att_readers(Rest)].
 
 make_att_reader({follows, Parser, Ref}) ->


[16/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Provide replacement shards in reduce views

BugzId: 20423


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/18f6c8e4
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/18f6c8e4
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/18f6c8e4

Branch: refs/heads/windsor-merge
Commit: 18f6c8e4a40cf047a40a6fe42386f1484321afab
Parents: bc57da8
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Sep 12 10:22:16 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_reduce.erl | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/18f6c8e4/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 16d6fb1..67c36a3 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -25,13 +25,16 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -
 
 go(DbName, DDoc, VName, Args, Callback, Acc, {red, {_, Lang, _}, _}=VInfo) ->
     RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
-    Workers0 = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
-        Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName,Args]}),
-        Shard#shard{ref = Ref}
-    end, fabric_view:get_shards(DbName, Args)),
+    RPCArgs = [DDoc, VName, Args],
+    Shards = fabric_view:get_shards(DbName, Args),
+    Repls = fabric_view:get_shard_replacements(DbName, Shards),
+    StartFun = fun(Shard) ->
+        hd(fabric_util:submit_jobs([Shard], fabric_rpc, reduce_view, RPCArgs))
+    end,
+    Workers0 = fabric_util:submit_jobs(Shards,fabric_rpc,reduce_view,RPCArgs),
     RexiMon = fabric_util:create_monitors(Workers0),
     try
-        case fabric_util:stream_start(Workers0, #shard.ref) of
+        case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
             {ok, Workers} ->
                 try
                     go2(DbName, Workers, Lang, RedSrc, Args, Callback, Acc)


[23/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Collect pending counts and report sum at end

BugzID: 24236


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/7826ce5f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/7826ce5f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/7826ce5f

Branch: refs/heads/windsor-merge
Commit: 7826ce5fa9058bbc1c85ac49be6c6fbb369f5606
Parents: 9de1096
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Oct 17 16:33:14 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 65 +++++++++++++++++++++++++++++++++-------
 1 file changed, 54 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/7826ce5f/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 36cb945..60dc54c 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -61,7 +61,7 @@ go(DbName, "normal", Options, Callback, Acc0) ->
     case validate_start_seq(DbName, Since) of
     ok ->
         {ok, Acc} = Callback(start, Acc0),
-        {ok, #collector{counters=Seqs, user_acc=AccOut}} = send_changes(
+        {ok, Collector} = send_changes(
             DbName,
             Args,
             Callback,
@@ -69,7 +69,8 @@ go(DbName, "normal", Options, Callback, Acc0) ->
             Acc,
             5000
         ),
-        Callback({stop, pack_seqs(Seqs)}, AccOut);
+        #collector{counters=Seqs, user_acc=AccOut, offset=Offset} = Collector,
+        Callback({stop, pack_seqs(Seqs), pending_count(Offset)}, AccOut);
     Error ->
         Callback(Error, Acc0)
     end.
@@ -77,12 +78,17 @@ go(DbName, "normal", Options, Callback, Acc0) ->
 keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0) ->
     #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args,
     {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, Timeout),
-    #collector{limit=Limit2, counters=NewSeqs, user_acc=AccOut} = Collector,
+    #collector{
+        limit = Limit2,
+        counters = NewSeqs,
+        offset = Offset,
+        user_acc = AccOut
+    } = Collector,
     LastSeq = pack_seqs(NewSeqs),
     MaintenanceMode = config:get("cloudant", "maintenance_mode"),
     if Limit > Limit2, Feed == "longpoll";
       MaintenanceMode == "true"; MaintenanceMode == "nolb" ->
-        Callback({stop, LastSeq}, AccOut);
+        Callback({stop, LastSeq, pending_count(Offset)}, AccOut);
     true ->
         WaitForUpdate = wait_db_updated(UpListen),
         AccumulatedTime = timer:now_diff(os:timestamp(), T0) div 1000,
@@ -94,9 +100,9 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0)
         end,
         case {Heartbeat, AccumulatedTime > Max, WaitForUpdate} of
         {undefined, _, timeout} ->
-            Callback({stop, LastSeq}, AccOut);
+            Callback({stop, LastSeq, pending_count(Offset)}, AccOut);
         {_, true, timeout} ->
-            Callback({stop, LastSeq}, AccOut);
+            Callback({stop, LastSeq, pending_count(Offset)}, AccOut);
         _ ->
             {ok, AccTimeout} = Callback(timeout, AccOut),
             ?MODULE:keep_sending_changes(
@@ -183,6 +189,7 @@ send_changes(DbName, Workers, Seqs, ChangesArgs, Callback, AccIn, Timeout) ->
         counters = orddict:from_list(Seqs),
         user_acc = AccIn,
         limit = ChangesArgs#changes_args.limit,
+        offset = fabric_dict:init(Workers, null),
         rows = Seqs % store sequence positions instead
     },
     %% TODO: errors need to be handled here
@@ -207,10 +214,29 @@ handle_message({rexi_EXIT, Reason}, Worker, State) ->
 
 % Temporary upgrade clause - Case 24236
 handle_message({complete, Key}, Worker, State) when is_tuple(Key) ->
-    handle_message({complete, [{seq, Key}]}, Worker, State);
+    handle_message({complete, [{seq, Key}, {pending, 0}]}, Worker, State);
+
+handle_message({change, Props}, {Worker, _}, #collector{limit=0} = State) ->
+    O0 = State#collector.offset,
+    O1 = case fabric_dict:lookup_element(Worker, O0) of
+        null ->
+            % Use Pending+1 because we're ignoring this row in the response
+            Pending = couch_util:get_value(pending, Props),
+            fabric_dict:store(Worker, Pending+1, O0);
+        _ ->
+            O0
+    end,
+    maybe_stop(State#collector{offset = O1});
 
-handle_message(_, _, #collector{limit=0} = State) ->
-    {stop, State};
+handle_message({complete, Props}, Worker, #collector{limit=0} = State) ->
+    O0 = State#collector.offset,
+    O1 = case fabric_dict:lookup_element(Worker, O0) of
+        null ->
+            fabric_dict:store(Worker, couch_util:get_value(pending,Props), O0);
+        _ ->
+            O0
+    end,
+    maybe_stop(State#collector{offset = O1});
 
 handle_message(#change{} = Row, {Worker, From}, St) ->
     Change = {change, [
@@ -226,11 +252,13 @@ handle_message({change, Props}, {Worker, From}, St) ->
     #collector{
         callback = Callback,
         counters = S0,
+        offset = O0,
         limit = Limit,
         user_acc = AccIn
     } = St,
     true = fabric_dict:is_key(Worker, S0),
     S1 = fabric_dict:store(Worker, couch_util:get_value(seq, Props), S0),
+    O1 = fabric_dict:store(Worker, couch_util:get_value(pending, Props), O0),
     % Temporary hack for FB 23637
     Interval = erlang:get(changes_seq_interval),
     if (Interval == undefined) orelse (Limit rem Interval == 0) ->
@@ -240,7 +268,7 @@ handle_message({change, Props}, {Worker, From}, St) ->
     end,
     {Go, Acc} = Callback(changes_row(Props2), AccIn),
     rexi:stream_ack(From),
-    {Go, St#collector{counters=S1, limit=Limit-1, user_acc=Acc}};
+    {Go, St#collector{counters=S1, offset=O1, limit=Limit-1, user_acc=Acc}};
 
 handle_message({no_pass, Seq}, {Worker, From}, St) ->
     #collector{counters = S0} = St,
@@ -253,11 +281,13 @@ handle_message({complete, Props}, Worker, State) ->
     Key = couch_util:get_value(seq, Props),
     #collector{
         counters = S0,
+        offset = O0,
         total_rows = Completed % override
     } = State,
     true = fabric_dict:is_key(Worker, S0),
     S1 = fabric_dict:store(Worker, Key, S0),
-    NewState = State#collector{counters=S1, total_rows=Completed+1},
+    O1 = fabric_dict:store(Worker, couch_util:get_value(pending, Props), O0),
+    NewState = State#collector{counters=S1, offset=O1, total_rows=Completed+1},
     % We're relying on S1 having exactly the numnber of workers that
     % are participtaing in this response. With the new stream_start
     % that's a bit more obvious but historically it wasn't quite
@@ -270,6 +300,7 @@ handle_message({complete, Props}, Worker, State) ->
     end,
     {Go, NewState}.
 
+
 make_replacement_arg(Node, {Seq, Uuid}) ->
     {replace, Node, Uuid, Seq};
 make_replacement_arg(Node, {Seq, Uuid, _}) ->
@@ -279,6 +310,15 @@ make_replacement_arg(Node, {Seq, Uuid, _}) ->
 make_replacement_arg(_, _) ->
     0.
 
+maybe_stop(#collector{offset = Offset} = State) ->
+    case fabric_dict:any(null, Offset) of
+        false ->
+            {stop, State};
+        true ->
+            % Wait till we've heard from everyone to compute pending count
+            {ok, State}
+    end.
+
 make_changes_args(#changes_args{style=Style, filter=undefined}=Args) ->
     Args#changes_args{filter = Style};
 make_changes_args(Args) ->
@@ -310,6 +350,9 @@ collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) ->
         end
     end.
 
+pending_count(Dict) ->
+    fabric_dict:fold(fun(_Worker, C, Acc) -> C+Acc end, 0, Dict).
+
 pack_seqs(Workers) ->
     SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers],
     SeqSum = lists:sum([seq(S) || {_,_,S} <- SeqList]),


[44/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Set non-interactive default for set_security

Prior to this commit, fabric:set_security would default to interactive
IO priority. This would cause all nodes to set the security header
except shards on any nodes which were in MM. Subsequent calls to
fabric:get_security would cause inconsistent security properties to be
applied to the request. This patch sets a non-interactive default for
fabric:set_security, which will cause security updates to be applied to
all nodes regardless of MM by default.

BugzID: 28847


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/bfaa56b2
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/bfaa56b2
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/bfaa56b2

Branch: refs/heads/windsor-merge
Commit: bfaa56b25a6bbcb40bd6047374b4cd4e3a61ab44
Parents: 992cf38
Author: Benjamin Bastian <be...@gmail.com>
Authored: Wed Mar 12 22:50:52 2014 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/bfaa56b2/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 6c0d47a..b5ec718 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -152,7 +152,13 @@ get_update_seq(DbName) ->
 get_update_seq(DbName, DbOptions) ->
     with_db(DbName, DbOptions, {couch_db, get_update_seq, []}).
 
-set_security(DbName, SecObj, Options) ->
+set_security(DbName, SecObj, Options0) ->
+    Options = case lists:keyfind(io_priority, 1, Options0) of
+        false ->
+            [{io_priority, {db_meta, security}}|Options0];
+        _ ->
+            Options0
+    end,
     with_db(DbName, Options, {couch_db, set_security, [SecObj]}).
 
 get_all_security(DbName, Options) ->


[41/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Only attempt replacement when we have replacements

We weren't checking if we were even provided replacements before using
them. This just adds a guard so we don't end up with a badmatch error on
the `lists:keytake/3`.

BugzId: 26124


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/2e98455e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/2e98455e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/2e98455e

Branch: refs/heads/windsor-merge
Commit: 2e98455e881a8b9d450b1baf831e531c04ceca3d
Parents: 4d9106e
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Dec 12 12:51:46 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_util.erl | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/2e98455e/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index 642d7d6..5f59725 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -86,13 +86,14 @@ handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
     end;
 handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
     Workers = fabric_dict:erase(Worker, St#stream_acc.workers),
+    Replacements = St#stream_acc.replacements,
     case {fabric_view:is_progress_possible(Workers), Reason} of
     {true, _} ->
         {ok, St#stream_acc{workers=Workers}};
-    {false, {maintenance_mode, _Node}} ->
+    {false, {maintenance_mode, _Node}} when Replacements /= undefined ->
         % Check if we have replacements for this range
         % and start the new workers if so.
-        case lists:keytake(Worker#shard.range, 1, St#stream_acc.replacements) of
+        case lists:keytake(Worker#shard.range, 1, Replacements) of
             {value, {_Range, WorkerReplacements}, NewReplacements} ->
                 FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
                     NewWorker = (St#stream_acc.start_fun)(Repl),


[09/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Update _all_docs coordinator to use new RPC APIs

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/216ffb3a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/216ffb3a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/216ffb3a

Branch: refs/heads/windsor-merge
Commit: 216ffb3a210ecc3e00ae688be6fb64d4c9a8fb5e
Parents: 36a2d2d
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Sep 4 17:28:57 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_all_docs.erl | 127 ++++++++++++++++++++------------------
 1 file changed, 66 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/216ffb3a/src/fabric_view_all_docs.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl
index e5ed7b3..8dc9173 100644
--- a/src/fabric_view_all_docs.erl
+++ b/src/fabric_view_all_docs.erl
@@ -20,29 +20,26 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
-go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc0) ->
-    Workers = fabric_util:submit_jobs(mem3:shards(DbName),all_docs,[Options, QueryArgs]),
-    #mrargs{limit = Limit, skip = Skip} = QueryArgs,
-    State = #collector{
-        query_args = QueryArgs,
-        callback = Callback,
-        counters = fabric_dict:init(Workers, 0),
-        skip = Skip,
-        limit = Limit,
-        user_acc = Acc0
-    },
-    RexiMon = fabric_util:create_monitors(Workers),
-    try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
-        State, infinity, 5000) of
-    {ok, NewState} ->
-        {ok, NewState#collector.user_acc};
-    {timeout, NewState} ->
-        Callback({error, timeout}, NewState#collector.user_acc);
-    {error, Resp} ->
-        {ok, Resp}
+go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
+    Shards = mem3:shards(DbName),
+    Workers0 = fabric_util:submit_jobs(
+            Shards, fabric_rpc, all_docs, [Options, QueryArgs]),
+    RexiMon = fabric_util:create_monitors(Workers0),
+    try
+        case fabric_util:stream_start(Workers0, #shard.ref) of
+            {ok, Workers} ->
+                try
+                    go(DbName, Options, Workers, QueryArgs, Callback, Acc)
+                after
+                    fabric_util:cleanup(Workers)
+                end;
+            {timeout, _} ->
+                Callback({error, timeout}, Acc);
+            {error, Error} ->
+                Callback({error, Error}, Acc)
+        end
     after
-        rexi_monitor:stop(RexiMon),
-        fabric_util:cleanup(Workers)
+        rexi_monitor:stop(RexiMon)
     end;
 
 
@@ -81,19 +78,32 @@ go(DbName, Options, QueryArgs, Callback, Acc0) ->
         Callback(timeout, Acc0)
     end.
 
+go(DbName, _Options, Workers, QueryArgs, Callback, Acc0) ->
+    #mrargs{limit = Limit, skip = Skip} = QueryArgs,
+    State = #collector{
+        db_name = DbName,
+        query_args = QueryArgs,
+        callback = Callback,
+        counters = fabric_dict:init(Workers, 0),
+        skip = Skip,
+        limit = Limit,
+        user_acc = Acc0
+    },
+    case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
+        State, infinity, 5000) of
+    {ok, NewState} ->
+        {ok, NewState#collector.user_acc};
+    {timeout, NewState} ->
+        Callback({error, timeout}, NewState#collector.user_acc);
+    {error, Resp} ->
+        {ok, Resp}
+    end.
+
 handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    fabric_view:remove_down_shards(State, NodeRef);
+    fabric_view:check_down_shards(State, NodeRef);
 
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
-    #collector{callback=Callback, counters=Counters0, user_acc=Acc} = State,
-    Counters = fabric_dict:erase(Worker, Counters0),
-    case fabric_view:is_progress_possible(Counters) of
-    true ->
-        {ok, State#collector{counters = Counters}};
-    false ->
-        {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-        {error, Resp}
-    end;
+    fabric_view:handle_worker_exit(State, Worker, Reason);
 
 handle_message({meta, Meta0}, {Worker, From}, State) ->
     Tot = couch_util:get_value(total, Meta0, 0),
@@ -105,35 +115,30 @@ handle_message({meta, Meta0}, {Worker, From}, State) ->
         offset = Offset0,
         user_acc = AccIn
     } = State,
-    case fabric_dict:lookup_element(Worker, Counters0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate
-        gen_server:reply(From, stop),
-        {ok, State};
-    0 ->
-        gen_server:reply(From, ok),
-        Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
-        Counters2 = fabric_view:remove_overlapping_shards(Worker, Counters1),
-        Total = Total0 + Tot,
-        Offset = Offset0 + Off,
-        case fabric_dict:any(0, Counters2) of
-        true ->
-            {ok, State#collector{
-                counters = Counters2,
-                total_rows = Total,
-                offset = Offset
-            }};
-        false ->
-            FinalOffset = erlang:min(Total, Offset+State#collector.skip),
-            Meta = [{total, Total}, {offset, FinalOffset}],
-            {Go, Acc} = Callback({meta, Meta}, AccIn),
-            {Go, State#collector{
-                counters = fabric_dict:decrement_all(Counters2),
-                total_rows = Total,
-                offset = FinalOffset,
-                user_acc = Acc
-            }}
-        end
+    % Assert that we don't have other messages from this
+    % worker when the total_and_offset message arrives.
+    0 = fabric_dict:lookup_element(Worker, Counters0),
+    rexi:stream_ack(From),
+    Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
+    Total = Total0 + Tot,
+    Offset = Offset0 + Off,
+    case fabric_dict:any(0, Counters1) of
+    true ->
+        {ok, State#collector{
+            counters = Counters1,
+            total_rows = Total,
+            offset = Offset
+        }};
+    false ->
+        FinalOffset = erlang:min(Total, Offset+State#collector.skip),
+        Meta = [{total, Total}, {offset, FinalOffset}],
+        {Go, Acc} = Callback({meta, Meta}, AccIn),
+        {Go, State#collector{
+            counters = fabric_dict:decrement_all(Counters1),
+            total_rows = Total,
+            offset = FinalOffset,
+            user_acc = Acc
+        }}
     end;
 
 handle_message(#view_row{} = Row, {Worker, From}, State) ->


[24/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Fix inclusion of "doc":"undefined" in changes feed

We were reusing the wrong variable after removing the default values.

BugzId: 24356


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/1f045616
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/1f045616
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/1f045616

Branch: refs/heads/windsor-merge
Commit: 1f045616dda3ec667aabda37c31c843394f56bd9
Parents: ec26cba
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Oct 22 00:02:46 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1f045616/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 479c32b..3ca60b1 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -382,7 +382,7 @@ changes_row(Props0, IncludeDocs) ->
             lists:keydelete(deleted, 1, Props1)
     end,
     Allowed = [seq, id, changes, deleted, doc],
-    Props3 = lists:filter(fun({K,_V}) -> lists:member(K, Allowed) end, Props0),
+    Props3 = lists:filter(fun({K,_V}) -> lists:member(K, Allowed) end, Props2),
     {change, {Props3}}.
 
 find_replacement_shards(#shard{range=Range}, AllShards) ->


[33/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
[squash] add deprecation info


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/cb16735d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/cb16735d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/cb16735d

Branch: refs/heads/windsor-merge
Commit: cb16735d274649117af663dda7af1534b9c85bce
Parents: 90411dd
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Mon Nov 25 15:31:47 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/cb16735d/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 76c8c16..6d5dd24 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -271,6 +271,8 @@ handle_message({complete, Props}, Worker, State) ->
 make_replacement_arg(Node, {Seq, Uuid}) ->
     {replace, Node, Uuid, Seq};
 make_replacement_arg(Node, {Seq, Uuid, _}) ->
+    %% TODO Deprecated, remove when we're confident no seqs with this format
+    %% are in the wild
     {replace, Node, Uuid, Seq};
 make_replacement_arg(_, _) ->
     0.


[43/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Include epoch node in seqs and replacement logic

The {Seq, UUID, Node} triplet uniquely identifies an update in a
cluster.  Including the node that originally accepted the update in
sequences allows us to defend against byzantine cases of shards moving
back and forth between the nodes in a cluster.

This patch updates the sequence generation to include the full triplet
in each element of the _changes sequence.  It also uses the epoch node
instead of the node currently hosting the shard when determining the
safe replacement sequence.

BugzID: 27193


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/74e4a791
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/74e4a791
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/74e4a791

Branch: refs/heads/windsor-merge
Commit: 74e4a7916da5e3c7400e6d0affd39482647be8b7
Parents: 8cefcd8
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Jan 16 17:32:48 2014 -0800
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl          | 18 +++++++++++-------
 src/fabric_view_changes.erl | 11 ++++++-----
 2 files changed, 17 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/74e4a791/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index b398969..6c0d47a 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -32,7 +32,8 @@
     seq,
     args,
     options,
-    pending
+    pending,
+    epochs
 }).
 
 %% rpc endpoints
@@ -57,7 +58,8 @@ changes(DbName, Options, StartVector, DbOptions) ->
           seq = StartSeq,
           args = Args,
           options = Options,
-          pending = couch_db:count_changes_since(Db, StartSeq)
+          pending = couch_db:count_changes_since(Db, StartSeq),
+          epochs = get_epochs(Db)
         },
         try
             {ok, #cacc{seq=LastSeq, pending=Pending}} =
@@ -321,7 +323,8 @@ changes_enumerator(DocInfo, Acc) ->
         db = Db,
         args = #changes_args{include_docs = IncludeDocs, filter = Filter},
         options = Options,
-        pending = Pending
+        pending = Pending,
+        epochs = Epochs
     } = Acc,
     Conflicts = proplists:get_value(conflicts, Options, false),
     #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del}|_]} = DocInfo,
@@ -332,7 +335,7 @@ changes_enumerator(DocInfo, Acc) ->
         Opts = if Conflicts -> [conflicts]; true -> [] end,
         ChangesRow = {change, [
 	    {pending, Pending-1},
-            {seq, {Seq, uuid(Db)}},
+            {seq, {Seq, uuid(Db), owner_of(Seq, Epochs)}},
             {id, Id},
             {changes, Results},
             {deleted, Del} |
@@ -424,12 +427,13 @@ set_io_priority(DbName, Options) ->
 
 calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) ->
     Seq;
-calculate_start_seq(Db, Node, {Seq, Uuid, _}) -> % downgrade clause
-    calculate_start_seq(Db, Node, {Seq, Uuid});
 calculate_start_seq(Db, Node, {Seq, Uuid}) ->
+    % Treat the current node as the epoch node
+    calculate_start_seq(Db, Node, {Seq, Uuid, Node});
+calculate_start_seq(Db, Node, {Seq, Uuid, EpochNode}) ->
     case is_prefix(Uuid, couch_db:get_uuid(Db)) of
         true ->
-            case is_owner(Node, Seq, couch_db:get_epochs(Db)) of
+            case is_owner(EpochNode, Seq, couch_db:get_epochs(Db)) of
                 true -> Seq;
                 false -> 0
             end;

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/74e4a791/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index c4eab22..34670c9 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -305,10 +305,11 @@ handle_message({complete, Props}, Worker, State) ->
 
 make_replacement_arg(Node, {Seq, Uuid}) ->
     {replace, Node, Uuid, Seq};
-make_replacement_arg(Node, {Seq, Uuid, _}) ->
-    %% TODO Deprecated, remove when we're confident no seqs with this format
-    %% are in the wild
-    {replace, Node, Uuid, Seq};
+make_replacement_arg(_Node, {Seq, Uuid, EpochNode}) ->
+    % The replacement should properly be computed aginst the node that owned
+    % the sequence when it was written to disk (the EpochNode) rather than the
+    % node we're trying to replace.
+    {replace, EpochNode, Uuid, Seq};
 make_replacement_arg(_, _) ->
     0.
 
@@ -347,7 +348,7 @@ pack_seqs(Workers) ->
     Opaque = couch_util:encodeBase64Url(term_to_binary(SeqList, [compressed])),
     [SeqSum, Opaque].
 
-seq({Seq, _Uuid, _Node}) -> Seq; % downgrade clause
+seq({Seq, _Uuid, _Node}) -> Seq;
 seq({Seq, _Uuid}) -> Seq;
 seq(Seq)          -> Seq.
 


[22/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Be defensive when calcualting pending_count

Its theoretically possible that during the upgrade we end up with a
mixed set of RPC workers. This makes sure that we don't crash the
coordinator with a badarith error.

BugzId: 24236


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/af206990
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/af206990
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/af206990

Branch: refs/heads/windsor-merge
Commit: af206990c0a953eda3d7098bd480ba7b5e72df3a
Parents: 7826ce5
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Dec 10 11:35:22 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/af206990/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 60dc54c..1797539 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -351,7 +351,12 @@ collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) ->
     end.
 
 pending_count(Dict) ->
-    fabric_dict:fold(fun(_Worker, C, Acc) -> C+Acc end, 0, Dict).
+    fabric_dict:fold(fun
+        (_Worker, Count, Acc) when is_integer(Count), is_integer(Acc) ->
+            Count + Acc;
+        (_Worker, _Count, _Acc) ->
+            null
+    end, 0, Dict).
 
 pack_seqs(Workers) ->
     SeqList = [{N,R,S} || {#shard{node=N, range=R}, S} <- Workers],


[46/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Make _changes?descending=true aware of maint. mode

The _changes coordinator was using a one-off thing to get the start
sequence for a database which did not provide any error handling at all.
This patch replaces that one-off with a call to get_db_info/1, which
does include all the normal error handling.

A future enhancement could teach the coordinator to understand a
"sequence" like eof that would cause all the workers to automatically
seek to the end, but I wanted to stay conservative here in order to land
this for the next release.

BugzID: 25944


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/dbcbfc18
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/dbcbfc18
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/dbcbfc18

Branch: refs/heads/windsor-merge
Commit: dbcbfc1842e9cf1c025b0484ade6abd4f1b6cb7c
Parents: a01d239
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Jan 2 09:18:37 2014 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 23 ++---------------------
 1 file changed, 2 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/dbcbfc18/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 8de01f1..5d0672e 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -328,30 +328,11 @@ make_changes_args(Args) ->
 
 get_start_seq(DbName, #changes_args{dir=Dir, since=Since})
   when Dir == rev; Since == "now" ->
-    Shards = mem3:shards(DbName),
-    Workers = fabric_util:submit_jobs(Shards, get_update_seq, []),
-    {ok, Seqs} = fabric_util:recv(Workers, #shard.ref,
-        fun collect_update_seqs/3, fabric_dict:init(Workers, -1)),
-    Seqs;
+    {ok, Info} = fabric:get_db_info(DbName),
+    couch_util:get_value(update_seq, Info);
 get_start_seq(_DbName, #changes_args{dir=fwd, since=Since}) ->
     Since.
 
-collect_update_seqs(Seq, Shard, Counters) when is_integer(Seq) ->
-    case fabric_dict:lookup_element(Shard, Counters) of
-    undefined ->
-        % already heard from someone else in this range
-        {ok, Counters};
-    -1 ->
-        C1 = fabric_dict:store(Shard, Seq, Counters),
-        C2 = fabric_view:remove_overlapping_shards(Shard, C1),
-        case fabric_dict:any(-1, C2) of
-        true ->
-            {ok, C2};
-        false ->
-            {stop, pack_seqs(C2)}
-        end
-    end.
-
 pending_count(Dict) ->
     fabric_dict:fold(fun
         (_Worker, Count, Acc) when is_integer(Count), is_integer(Acc) ->


[38/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Consistently log fabric worker timeouts

Write a log line for each worker that did not return a response
when a fabric request times out. The format of the log line is:

    fabric_worker_timeout ENDPOINT,NODE,SHARD_NAME

This is intented to be easily consumable by downstream tools
(e.g., Splunk).

BugzID: 26984


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/b1c0030f
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/b1c0030f
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/b1c0030f

Branch: refs/heads/windsor-merge
Commit: b1c0030fa3d960e9ea6e3109204ffee185459b8e
Parents: bfaa56b
Author: Mike Wallace <mi...@googlemail.com>
Authored: Sat Jan 11 22:11:20 2014 +0000
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:43 2014 +0100

----------------------------------------------------------------------
 include/fabric.hrl              |  6 ++++++
 src/fabric_db_create.erl        | 10 +++++++++-
 src/fabric_db_delete.erl        |  7 ++++++-
 src/fabric_db_doc_count.erl     |  9 +++++++--
 src/fabric_db_info.erl          | 10 ++++++++++
 src/fabric_db_meta.erl          | 24 ++++++++++++++++++------
 src/fabric_dict.erl             |  3 +++
 src/fabric_doc_missing_revs.erl | 11 +++++++++--
 src/fabric_doc_open.erl         |  3 +++
 src/fabric_doc_open_revs.erl    |  3 +++
 src/fabric_doc_update.erl       |  4 +++-
 src/fabric_group_info.erl       |  9 +++++++--
 src/fabric_util.erl             | 16 ++++++++++------
 src/fabric_view_all_docs.erl    |  9 ++++++++-
 src/fabric_view_changes.erl     | 10 ++++++++++
 src/fabric_view_map.erl         | 10 +++++++++-
 src/fabric_view_reduce.erl      | 10 +++++++++-
 17 files changed, 130 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/include/fabric.hrl
----------------------------------------------------------------------
diff --git a/include/fabric.hrl b/include/fabric.hrl
index 94769bd..abbc4ad 100644
--- a/include/fabric.hrl
+++ b/include/fabric.hrl
@@ -32,5 +32,11 @@
     user_acc
 }).
 
+-record(stream_acc, {
+    workers,
+    start_fun,
+    replacements
+}).
+
 -record(view_row, {key, id, value, doc, worker}).
 -record(change, {key, id, value, deleted=false, doc, worker}).

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_db_create.erl
----------------------------------------------------------------------
diff --git a/src/fabric_db_create.erl b/src/fabric_db_create.erl
index 5a8334f..8b9d32a 100644
--- a/src/fabric_db_create.erl
+++ b/src/fabric_db_create.erl
@@ -73,6 +73,9 @@ create_shard_files(Shards) ->
     try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Workers) of
     {error, file_exists} ->
         file_exists;
+    {timeout, DefunctWorkers} ->
+        fabric_util:log_timeout(DefunctWorkers, "create_db"),
+        {error, timeout};
     _ ->
         ok
     after
@@ -104,7 +107,12 @@ create_shard_db_doc(Doc) ->
     Workers = fabric_util:submit_jobs(Shards, create_shard_db_doc, [Doc]),
     Acc0 = {length(Shards), fabric_dict:init(Workers, nil)},
     try fabric_util:recv(Workers, #shard.ref, fun handle_db_update/3, Acc0) of
-    {timeout, _} ->
+    {timeout, {_, WorkersDict}} ->
+        DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
+        fabric_util:log_timeout(
+            DefunctWorkers,
+            "create_shard_db_doc"
+        ),
         {error, timeout};
     Else ->
         Else

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_db_delete.erl
----------------------------------------------------------------------
diff --git a/src/fabric_db_delete.erl b/src/fabric_db_delete.erl
index 2087f44..9ba55fb 100644
--- a/src/fabric_db_delete.erl
+++ b/src/fabric_db_delete.erl
@@ -43,7 +43,12 @@ delete_shard_db_doc(Doc) ->
     Workers = fabric_util:submit_jobs(Shards, delete_shard_db_doc, [Doc]),
     Acc0 = {length(Shards), fabric_dict:init(Workers, nil)},
     try fabric_util:recv(Workers, #shard.ref, fun handle_db_update/3, Acc0) of
-    {timeout, _} ->
+    {timeout, {_, WorkersDict}} ->
+        DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
+        fabric_util:log_timeout(
+            DefunctWorkers,
+            "delete_shard_db_doc"
+        ),
         {error, timeout};
     Else ->
         Else

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_db_doc_count.erl
----------------------------------------------------------------------
diff --git a/src/fabric_db_doc_count.erl b/src/fabric_db_doc_count.erl
index dcc32aa..a0fd3ec 100644
--- a/src/fabric_db_doc_count.erl
+++ b/src/fabric_db_doc_count.erl
@@ -23,8 +23,13 @@ go(DbName) ->
     Workers = fabric_util:submit_jobs(Shards, get_doc_count, []),
     RexiMon = fabric_util:create_monitors(Shards),
     Acc0 = {fabric_dict:init(Workers, nil), 0},
-    try
-        fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0)
+    try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+    {timeout, {WorkersDict, _}} ->
+        DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
+        fabric_util:log_timeout(DefunctWorkers, "get_doc_count"),
+        {error, timeout};
+    Else ->
+        Else
     after
         rexi_monitor:stop(RexiMon)
     end.

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_db_info.erl
----------------------------------------------------------------------
diff --git a/src/fabric_db_info.erl b/src/fabric_db_info.erl
index de88632..8a41cde 100644
--- a/src/fabric_db_info.erl
+++ b/src/fabric_db_info.erl
@@ -26,6 +26,16 @@ go(DbName) ->
     try
         case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
             {ok, Acc} -> {ok, Acc};
+            {timeout, {WorkersDict, _}} ->
+                DefunctWorkers = fabric_util:remove_done_workers(
+                    WorkersDict,
+                    nil
+                ),
+                fabric_util:log_timeout(
+                    DefunctWorkers,
+                    "get_db_info"
+                ),
+                {error, timeout};
             {error, Error} -> throw(Error)
         end
     after

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_db_meta.erl
----------------------------------------------------------------------
diff --git a/src/fabric_db_meta.erl b/src/fabric_db_meta.erl
index 2d5ba8d..1550062 100644
--- a/src/fabric_db_meta.erl
+++ b/src/fabric_db_meta.erl
@@ -29,19 +29,22 @@ set_revs_limit(DbName, Limit, Options) ->
     Shards = mem3:shards(DbName),
     Workers = fabric_util:submit_jobs(Shards, set_revs_limit, [Limit, Options]),
     Handler = fun handle_revs_message/3,
-    Waiting = length(Workers) - 1,
-    case fabric_util:recv(Workers, #shard.ref, Handler, Waiting) of
+    Acc0 = {Workers, length(Workers) - 1},
+    case fabric_util:recv(Workers, #shard.ref, Handler, Acc0) of
     {ok, ok} ->
         ok;
+    {timeout, {DefunctWorkers, _}} ->
+        fabric_util:log_timeout(DefunctWorkers, "set_revs_limit"),
+        {error, timeout};
     Error ->
         Error
     end.
 
-handle_revs_message(ok, _, 0) ->
+handle_revs_message(ok, _, {_Workers, 0}) ->
     {stop, ok};
-handle_revs_message(ok, _, Waiting) ->
-    {ok, Waiting - 1};
-handle_revs_message(Error, _, _Waiting) ->
+handle_revs_message(ok, Worker, {Workers, Waiting}) ->
+    {ok, {lists:delete(Worker, Workers), Waiting - 1}};
+handle_revs_message(Error, _, _Acc) ->
     {error, Error}.
 
 
@@ -61,6 +64,9 @@ set_security(DbName, SecObj, Options) ->
             ok -> ok;
             Error -> Error
         end;
+    {timeout, #acc{workers=DefunctWorkers}} ->
+        fabric_util:log_timeout(DefunctWorkers, "set_security"),
+        {error, timeout};
     Error ->
         Error
     after
@@ -133,6 +139,12 @@ get_all_security(DbName, Options) ->
         {ok, SecObjs};
     {ok, _} ->
         {error, no_majority};
+    {timeout, #acc{workers=DefunctWorkers}} ->
+        fabric_util:log_timeout(
+            DefunctWorkers,
+            "get_all_security"
+        ),
+        {error, timeout};
     Error ->
         Error
     after

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_dict.erl
----------------------------------------------------------------------
diff --git a/src/fabric_dict.erl b/src/fabric_dict.erl
index f88ca97..ec2e25c 100644
--- a/src/fabric_dict.erl
+++ b/src/fabric_dict.erl
@@ -52,3 +52,6 @@ filter(Fun, Dict) ->
 
 fold(Fun, Acc0, Dict) ->
     orddict:fold(Fun, Acc0, Dict).
+
+to_list(Dict) ->
+    orddict:to_list(Dict).

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_doc_missing_revs.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_missing_revs.erl b/src/fabric_doc_missing_revs.erl
index ec154ee..1687111 100644
--- a/src/fabric_doc_missing_revs.erl
+++ b/src/fabric_doc_missing_revs.erl
@@ -29,8 +29,15 @@ go(DbName, AllIdsRevs, Options) ->
     ResultDict = dict:from_list([{Id, {{nil,Revs},[]}} || {Id, Revs} <- AllIdsRevs]),
     RexiMon = fabric_util:create_monitors(Workers),
     Acc0 = {length(Workers), ResultDict, Workers},
-    try
-        fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0)
+    try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+    {timeout, {_, _, DefunctWorkers}} ->
+        fabric_util:log_timeout(
+            DefunctWorkers,
+            "get_missing_revs"
+        ),
+        {error, timeout};
+    Else ->
+        Else
     after
         rexi_monitor:stop(RexiMon)
     end.

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_doc_open.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_open.erl b/src/fabric_doc_open.erl
index 26bd2a3..1805c80 100644
--- a/src/fabric_doc_open.erl
+++ b/src/fabric_doc_open.erl
@@ -47,6 +47,9 @@ go(DbName, Id, Options) ->
     {ok, #acc{}=Acc} ->
         Reply = handle_response(Acc),
         format_reply(Reply, SuppressDeletedDoc);
+    {timeout, #acc{workers=DefunctWorkers}} ->
+        fabric_util:log_timeout(DefunctWorkers, "open_doc"),
+        {error, timeout};
     Error ->
         Error
     after

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_doc_open_revs.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_open_revs.erl b/src/fabric_doc_open_revs.erl
index 31d7616..b24924d 100644
--- a/src/fabric_doc_open_revs.erl
+++ b/src/fabric_doc_open_revs.erl
@@ -47,6 +47,9 @@ go(DbName, Id, Revs, Options) ->
     try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, State) of
     {ok, {ok, Reply}} ->
         {ok, Reply};
+    {timeout, #state{workers=DefunctWorkers}} ->
+        fabric_util:log_timeout(DefunctWorkers, "open_revs"),
+        {error, timeout};
     Else ->
         Else
     after

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_doc_update.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_update.erl b/src/fabric_doc_update.erl
index 9e2ce50..da8eeed 100644
--- a/src/fabric_doc_update.erl
+++ b/src/fabric_doc_update.erl
@@ -39,7 +39,9 @@ go(DbName, AllDocs0, Opts) ->
     {ok, {Health, Results}} when Health =:= ok; Health =:= accepted ->
         {Health, [R || R <- couch_util:reorder_results(AllDocs, Results), R =/= noreply]};
     {timeout, Acc} ->
-        {_, _, W1, _, DocReplDict} = Acc,
+        {_, _, W1, GroupedDocs1, DocReplDict} = Acc,
+        {DefunctWorkers, _} = lists:unzip(GroupedDocs1),
+        fabric_util:log_timeout(DefunctWorkers, "update_docs"),
         {Health, _, Resp} = dict:fold(fun force_reply/3, {ok, W1, []},
             DocReplDict),
         {Health, [R || R <- couch_util:reorder_results(AllDocs, Resp), R =/= noreply]};

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_group_info.erl
----------------------------------------------------------------------
diff --git a/src/fabric_group_info.erl b/src/fabric_group_info.erl
index 5325f76..b5ee2b2 100644
--- a/src/fabric_group_info.erl
+++ b/src/fabric_group_info.erl
@@ -27,8 +27,13 @@ go(DbName, #doc{id=DDocId}) ->
     Workers = fabric_util:submit_jobs(Shards, group_info, [DDocId]),
     RexiMon = fabric_util:create_monitors(Shards),
     Acc0 = {fabric_dict:init(Workers, nil), []},
-    try
-        fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0)
+    try fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0) of
+    {timeout, {WorkersDict, _}} ->
+        DefunctWorkers = fabric_util:remove_done_workers(WorkersDict, nil),
+        fabric_util:log_timeout(DefunctWorkers, "group_info"),
+        {error, timeout};
+    Else ->
+        Else
     after
         rexi_monitor:stop(RexiMon)
     end.

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index 5f59725..18ff578 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -17,6 +17,7 @@
         remove_down_workers/2, doc_id_and_rev/1]).
 -export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0]).
 -export([stream_start/2, stream_start/4]).
+-export([log_timeout/2, remove_done_workers/2]).
 
 -compile({inline, [{doc_id_and_rev,1}]}).
 
@@ -25,12 +26,6 @@
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
--record(stream_acc, {
-    workers,
-    start_fun,
-    replacements
-}).
-
 remove_down_workers(Workers, BadNode) ->
     Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end,
     NewWorkers = fabric_dict:filter(Filter, Workers),
@@ -155,6 +150,15 @@ timeout(Type, Default) ->
         N -> list_to_integer(N)
     end.
 
+log_timeout(Workers, EndPoint) ->
+    lists:map(fun(#shard{node=Dest, name=Name}) ->
+        Fmt = "fabric_worker_timeout ~s,~p,~p",
+        ?LOG_ERROR(Fmt, [EndPoint, Dest, Name])
+    end, Workers).
+
+remove_done_workers(Workers, WaitingIndicator) ->
+    [W || {W, WI} <- fabric_dict:to_list(Workers), WI == WaitingIndicator].
+
 get_db(DbName) ->
     get_db(DbName, []).
 

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_view_all_docs.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl
index d4dec09..e7cc67c 100644
--- a/src/fabric_view_all_docs.erl
+++ b/src/fabric_view_all_docs.erl
@@ -33,7 +33,14 @@ go(DbName, Options, #mrargs{keys=undefined} = QueryArgs, Callback, Acc) ->
                 after
                     fabric_util:cleanup(Workers)
                 end;
-            {timeout, _} ->
+            {timeout, NewState} ->
+                DefunctWorkers = fabric_util:remove_done_workers(
+                    NewState#stream_acc.workers, waiting
+                ),
+                fabric_util:log_timeout(
+                    DefunctWorkers,
+                    "all_docs"
+                ),
                 Callback({error, timeout}, Acc);
             {error, Error} ->
                 Callback({error, Error}, Acc)

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 34670c9..44639d9 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -174,6 +174,16 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
                 after
                     fabric_util:cleanup(Workers)
                 end;
+            {timeout, NewState} ->
+                DefunctWorkers = fabric_util:remove_done_workers(
+                    NewState#stream_acc.workers,
+                    waiting
+                ),
+                fabric_util:log_timeout(
+                    DefunctWorkers,
+                    "changes"
+                ),
+                throw({error, timeout});
             {error, Reason} ->
                 throw({error, Reason});
             Else ->

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_view_map.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index 1201daf..1977888 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -40,7 +40,15 @@ go(DbName, DDoc, View, Args, Callback, Acc) ->
                 after
                     fabric_util:cleanup(Workers)
                 end;
-            {timeout, _} ->
+            {timeout, NewState} ->
+                DefunctWorkers = fabric_util:remove_done_workers(
+                    NewState#stream_acc.workers,
+                    waiting
+                ),
+                fabric_util:log_timeout(
+                    DefunctWorkers,
+                    "map_view"
+                ),
                 Callback({error, timeout}, Acc);
             {error, Error} ->
                 Callback({error, Error}, Acc)

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/b1c0030f/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 2e0d1f2..583c8ff 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -41,7 +41,15 @@ go(DbName, DDoc, VName, Args, Callback, Acc, {red, {_, Lang, _}, _}=VInfo) ->
                 after
                     fabric_util:cleanup(Workers)
                 end;
-            {timeout, _} ->
+            {timeout, NewState} ->
+                DefunctWorkers = fabric_util:remove_done_workers(
+                    NewState#stream_acc.workers,
+                    waiting
+                ),
+                fabric_util:log_timeout(
+                    DefunctWorkers,
+                    "reduce_view"
+                ),
                 Callback({error, timeout}, Acc);
             {error, Error} ->
                 Callback({error, Error}, Acc)


[29/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Calculate safe worker seq on shard replacement

When a shard is being replaced in the changes feed the RPC workers for
the other shard copies can leverage epoch and checkpoint information to
start from a sequence greater than zero.  This commit (and an associated
one in mem3) make that possible, though we still need to teach the
coordinator to share the fact that the worker is being summoned to
replace another copy that previously contributed to the feed.

BugzID: 22698


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/5d524363
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/5d524363
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/5d524363

Branch: refs/heads/windsor-merge
Commit: 5d52436339d395d324fdf12722b32cbd717b5742
Parents: 5e2d376
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Sun Nov 17 12:01:05 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 30 ++++++++++++++++++++++++++++++
 1 file changed, 30 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/5d524363/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 6f8e810..b0ce9b1 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -420,6 +420,18 @@ calculate_start_seq(Db, Node, {Seq, Uuid}) ->
             %% The file was rebuilt, most likely in a different
             %% order, so rewind.
             0
+    end;
+calculate_start_seq(Db, _Node, {replace, OriginalNode, Uuid, Seq}) ->
+    case is_prefix(Uuid, couch_db:get_uuid(Db)) of
+        true ->
+            start_seq(get_epochs(Db), OriginalNode, Seq);
+        false ->
+            %% Scan history looking for an entry with
+            %%  * target_node == TargetNode
+            %%  * target_uuid == TargetUUID
+            %%  * target_seq  =< TargetSeq
+            %% If such an entry is found, stream from associated source_seq
+            mem3_rep:find_source_seq(Db, OriginalNode, Uuid, Seq)
     end.
 
 is_prefix(Pattern, Subject) ->
@@ -436,6 +448,24 @@ owner_of(Seq, [{EpochNode, EpochSeq} | _Rest]) when Seq > EpochSeq ->
 owner_of(Seq, [_ | Rest]) ->
     owner_of(Seq, Rest).
 
+get_epochs(Db) ->
+    Epochs = couch_db:get_epochs(Db),
+    validate_epochs(Epochs),
+    Epochs.
+
+start_seq([{OrigNode, EpochSeq} | _], OrigNode, Seq) when Seq > EpochSeq ->
+    %% OrigNode is the owner of the Seq so we can safely stream from there
+    Seq;
+start_seq([{_, NewSeq}, {OrigNode, _} | _], OrigNode, Seq) when Seq > NewSeq ->
+    %% We transferred this file before Seq was written on OrigNode, so we need
+    %% to stream from the beginning of the next epoch. Note that it is _not_
+    %% necessary for the current node to own the epoch beginning at NewSeq
+    NewSeq;
+start_seq([_ | Rest], OrigNode, Seq) ->
+    start_seq(Rest, OrigNode, Seq);
+start_seq([], OrigNode, Seq) ->
+    erlang:error({epoch_mismatch, OrigNode, Seq}).
+
 validate_epochs(Epochs) ->
     %% Assert uniqueness.
     case length(Epochs) == length(lists:ukeysort(2, Epochs)) of


[04/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Implement fabric_view:check_down_shards/2

This replaces fabric_view:remove_down_shards/2 with a check to see if a
rexi_DOWN message affects any of the worker shards. This is because we
monitor all the shards but are only handling messages from a single ring
which may not have included a node in the initial shards list.

We no longer need to remove shards on down nodes because once our stream
handling functions run as soon as we see a shard on a downed node we
know that progress is no longer possible under our current behavior.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/dc1f0141
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/dc1f0141
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/dc1f0141

Branch: refs/heads/windsor-merge
Commit: dc1f01416ccf14e8836f4da42ffbfb1da4524ca1
Parents: bea3052
Author: Paul J. Davis <pa...@gmail.com>
Authored: Tue Sep 10 20:56:27 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:40 2014 +0100

----------------------------------------------------------------------
 src/fabric_view.erl | 23 +++++++++++++----------
 1 file changed, 13 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/dc1f0141/src/fabric_view.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
index 117dead..be41ce1 100644
--- a/src/fabric_view.erl
+++ b/src/fabric_view.erl
@@ -14,24 +14,27 @@
 
 -export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
     transform_row/1, keydict/1, extract_view/4, get_shards/2,
-    remove_down_shards/2]).
+    check_down_shards/2]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
--spec remove_down_shards(#collector{}, node()) ->
+%% @doc Check if a downed node affects any of our workers
+-spec check_down_shards(#collector{}, node()) ->
     {ok, #collector{}} | {error, any()}.
-remove_down_shards(Collector, BadNode) ->
+check_down_shards(Collector, BadNode) ->
     #collector{callback=Callback, counters=Counters, user_acc=Acc} = Collector,
-    case fabric_util:remove_down_workers(Counters, BadNode) of
-    {ok, NewCounters} ->
-        {ok, Collector#collector{counters = NewCounters}};
-    error ->
-        Reason = {nodedown, <<"progress not possible">>},
-        Callback({error, Reason}, Acc),
-        {error, Reason}
+    Filter = fun(#shard{node = Node}, _) -> Node == BadNode end,
+    BadCounters = fabric_dict:filter(Filter, Counters),
+    case fabric_dict:size(BadCounters) > 0 of
+        true ->
+            Reason = {nodedown, <<"progress not possible">>},
+            Callback({error, Reason}, Acc),
+            {error, Reason};
+        false ->
+            {ok, Collector}
     end.
 
 %% @doc looks for a fully covered keyrange in the list of counters


[17/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Allow shard replacements in stream_start

This adds fabric_util:stream_start/4 which accepts a start function and
a proplist of replacement shards keyed by range. If a worker exits due
to maintenance mode and prevents progress from being made, the list of
replacement shards is searched for any replacements for the range. If
any are found the start function is called for each possible
replacement. The start function takes the possible replacement and
should return the new worker.

BugzId: 20423


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/f2256977
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/f2256977
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/f2256977

Branch: refs/heads/windsor-merge
Commit: f225697757ed07fd344016f1a6a85674b800686c
Parents: 0f45478
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Sep 12 10:23:48 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_util.erl | 82 +++++++++++++++++++++++++++++++++++-------------
 1 file changed, 60 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/f2256977/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index f11abe3..fb6cd8b 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -16,13 +16,19 @@
         update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
         remove_down_workers/2]).
 -export([request_timeout/0]).
--export([stream_start/2]).
+-export([stream_start/2, stream_start/4]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("eunit/include/eunit.hrl").
 
+-record(stream_acc, {
+    workers,
+    start_fun,
+    replacements
+}).
+
 remove_down_workers(Workers, BadNode) ->
     Filter = fun(#shard{node = Node}, _) -> Node =/= BadNode end,
     NewWorkers = fabric_dict:filter(Filter, Workers),
@@ -45,12 +51,19 @@ submit_jobs(Shards, Module, EndPoint, ExtraArgs) ->
 cleanup(Workers) ->
     [rexi:kill(Node, Ref) || #shard{node=Node, ref=Ref} <- Workers].
 
-stream_start(Workers0, Keypos) ->
+stream_start(Workers, Keypos) ->
+    stream_start(Workers, Keypos, undefined, undefined).
+
+stream_start(Workers0, Keypos, StartFun, Replacements) ->
     Fun = fun handle_stream_start/3,
-    Acc = fabric_dict:init(Workers0, waiting),
+    Acc = #stream_acc{
+        workers = fabric_dict:init(Workers0, waiting),
+        start_fun = StartFun,
+        replacements = Replacements
+    },
     Timeout = request_timeout(),
     case rexi_utils:recv(Workers0, Keypos, Fun, Acc, Timeout, infinity) of
-        {ok, Workers} ->
+        {ok, #stream_acc{workers=Workers}} ->
             true = fabric_view:is_progress_possible(Workers),
             AckedWorkers = fabric_dict:fold(fun(Worker, From, WorkerAcc) ->
                 rexi:stream_start(From),
@@ -61,36 +74,61 @@ stream_start(Workers0, Keypos) ->
             Else
     end.
 
-handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
-    case fabric_util:remove_down_workers(State, NodeRef) of
-    {ok, NewState} ->
-        {ok, NewState};
+handle_stream_start({rexi_DOWN, _, {_, NodeRef}, _}, _, St) ->
+    case fabric_util:remove_down_workers(St#stream_acc.workers, NodeRef) of
+    {ok, Workers} ->
+        {ok, St#stream_acc{workers=Workers}};
     error ->
         Reason = {nodedown, <<"progress not possible">>},
         {error, Reason}
     end;
-handle_stream_start({rexi_EXIT, Reason}, Worker, State) ->
-    NewState = fabric_dict:erase(Worker, State),
-    case fabric_view:is_progress_possible(NewState) of
-    true ->
-        {ok, NewState};
-    false ->
+handle_stream_start({rexi_EXIT, Reason}, Worker, St) ->
+    Workers = fabric_dict:erase(Worker, St#stream_acc.workers),
+    case {fabric_view:is_progress_possible(Workers), Reason} of
+    {true, _} ->
+        {ok, St#stream_acc{workers=Workers}};
+    {false, {maintenance_mode, _Node}} ->
+        % Check if we have replacements for this range
+        % and start the new workers if so.
+        case lists:keytake(Worker#shard.range, 1, St#stream_acc.replacements) of
+            {value, {_Range, WorkerReplacements}, NewReplacements} ->
+                FinalWorkers = lists:foldl(fun(Repl, NewWorkers) ->
+                    NewWorker = (St#stream_acc.start_fun)(Repl),
+                    fabric_dict:store(NewWorker, waiting, NewWorkers)
+                end, Workers, WorkerReplacements),
+                % Assert that our replaced worker provides us
+                % the oppurtunity to make progress.
+                true = fabric_view:is_progress_possible(FinalWorkers),
+                NewRefs = fabric_dict:fetch_keys(FinalWorkers),
+                {new_refs, NewRefs, St#stream_acc{
+                    workers=FinalWorkers,
+                    replacements=NewReplacements
+                }};
+            false ->
+                % If we progress isn't possible and we don't have any
+                % replacements then we're dead in the water.
+                Error = {nodedown, <<"progress not possible">>},
+                {error, Error}
+        end;
+    {false, _} ->
         {error, fabric_util:error_info(Reason)}
     end;
-handle_stream_start(rexi_STREAM_INIT, {Worker, From}, State) ->
-    case fabric_dict:lookup_element(Worker, State) of
+handle_stream_start(rexi_STREAM_INIT, {Worker, From}, St) ->
+    case fabric_dict:lookup_element(Worker, St#stream_acc.workers) of
     undefined ->
         % This worker lost the race with other partition copies, terminate
         rexi:stream_cancel(From),
-        {ok, State};
+        {ok, St};
     waiting ->
         % Don't ack the worker yet so they don't start sending us
         % rows until we're ready
-        NewState0 = fabric_dict:store(Worker, From, State),
-        NewState1 = fabric_view:remove_overlapping_shards(Worker, NewState0),
-        case fabric_dict:any(waiting, NewState1) of
-            true -> {ok, NewState1};
-            false -> {stop, NewState1}
+        Workers0 = fabric_dict:store(Worker, From, St#stream_acc.workers),
+        Workers1 = fabric_view:remove_overlapping_shards(Worker, Workers0),
+        case fabric_dict:any(waiting, Workers1) of
+            true ->
+                {ok, St#stream_acc{workers=Workers1}};
+            false ->
+                {stop, St#stream_acc{workers=Workers1}}
         end
     end;
 handle_stream_start(Else, _, _) ->


[34/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Adjust replacement rewind exclusion

We're wanting to avoid attempting to use existing replacement
information for down shards. The is_integer guard didn't work because
the sequences are {UUID, Seq} tuples so we pattern match on that tuple
shape instead.

BugzId: 22698


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/16e020f6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/16e020f6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/16e020f6

Branch: refs/heads/windsor-merge
Commit: 16e020f647a457e86723d744a1295d82136550f1
Parents: cb16735
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Dec 6 14:44:44 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/16e020f6/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 6d5dd24..ed6dfb2 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -136,7 +136,10 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
     StartFun = fun(#shard{name=Name, node=N, range=R0}=Shard) ->
         %% Find the original shard copy in the Seqs array
         case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, Seqs) of
-            [{#shard{node = OldNode}, OldSeq} | _] when is_integer(OldSeq) ->
+            % The {_, _}=OldSeq pattern match is so that we don't
+            % accidentally try and replace based on the generated
+            % {replace, _, _, _} tuples.
+            [{#shard{node = OldNode}, {_, _}=OldSeq} | _] ->
                 SeqArg = make_replacement_arg(OldNode, OldSeq);
             _ ->
                 % TODO this clause is probably unreachable in the N>2


[25/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Handle fabric upgrade

We still need to handle direct document passing for older
clients. The new Id+Rev protocol will be a part of a
future upgrade.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/e8cf1bb4
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/e8cf1bb4
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/e8cf1bb4

Branch: refs/heads/windsor-merge
Commit: e8cf1bb40c7de53357450df397d9e1335994e8a4
Parents: df0c3a3
Author: Brian Mitchell <br...@p2p.io>
Authored: Thu Oct 24 10:12:58 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl         | 4 ++++
 src/fabric_view_map.erl    | 2 +-
 src/fabric_view_reduce.erl | 2 +-
 3 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/e8cf1bb4/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index cee30f2..6f8e810 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -69,6 +69,8 @@ map_view(DbName, DDocInfo, ViewName, Args0) ->
 
 map_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
     {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev),
+    map_view(DbName, DDoc, ViewName, Args0, DbOptions);
+map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     set_io_priority(DbName, DbOptions),
     Args = fix_skip_and_limit(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
@@ -81,6 +83,8 @@ reduce_view(DbName, DDocInfo, ViewName, Args0) ->
 
 reduce_view(DbName, {DDocId, Rev}, ViewName, Args0, DbOptions) ->
     {ok, DDoc} = ddoc_cache:open_doc(mem3:dbname(DbName), DDocId, Rev),
+    reduce_view(DbName, DDoc, ViewName, Args0, DbOptions);
+reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
     set_io_priority(DbName, DbOptions),
     Args = fix_skip_and_limit(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/e8cf1bb4/src/fabric_view_map.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index 1201daf..d2c010a 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -26,7 +26,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
 go(DbName, DDoc, View, Args, Callback, Acc) ->
     Shards = fabric_view:get_shards(DbName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
-    RPCArgs = [fabric_util:doc_id_and_rev(DDoc), View, Args],
+    RPCArgs = [DDoc, View, Args],
     StartFun = fun(Shard) ->
         hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
     end,

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/e8cf1bb4/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 2e0d1f2..67c36a3 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -25,7 +25,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -
 
 go(DbName, DDoc, VName, Args, Callback, Acc, {red, {_, Lang, _}, _}=VInfo) ->
     RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
-    RPCArgs = [fabric_util:doc_id_and_rev(DDoc), VName, Args],
+    RPCArgs = [DDoc, VName, Args],
     Shards = fabric_view:get_shards(DbName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
     StartFun = fun(Shard) ->


[30/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Use smarter replacements for stream_start failures

This patch enhances the StartFun to search the original list of workers
for the sequence from which the worker being replaced had started.

BugzID: 22698


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/90411dd9
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/90411dd9
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/90411dd9

Branch: refs/heads/windsor-merge
Commit: 90411dd9cd3003461911e286b2ddbd62269d917c
Parents: f4616ee
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri Nov 22 22:03:22 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/90411dd9/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 4ee87e4..76c8c16 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -133,8 +133,20 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
     end, unpack_seqs(PackedSeqs, DbName)),
     {Workers0, _} = lists:unzip(Seqs),
     Repls = fabric_view:get_shard_replacements(DbName, Workers0),
-    StartFun = fun(#shard{name=Name, node=N}=Shard) ->
-        Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, 0]}),
+    StartFun = fun(#shard{name=Name, node=N, range=R0}=Shard) ->
+        %% Find the original shard copy in the Seqs array
+        case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, Seqs) of
+            [{#shard{node = OldNode}, OldSeq} | _] when is_integer(OldSeq) ->
+                SeqArg = make_replacement_arg(OldNode, OldSeq);
+            _ ->
+                % TODO this clause is probably unreachable in the N>2
+                % case because we compute replacements only if a shard has one
+                % in the original set.
+                couch_log:error("Streaming ~s from zero while replacing ~p",
+                    [Name, PackedSeqs]),
+                SeqArg = 0
+        end,
+        Ref = rexi:cast(N, {fabric_rpc, changes, [Name, ChangesArgs, SeqArg]}),
         Shard#shard{ref = Ref}
     end,
     RexiMon = fabric_util:create_monitors(Workers0),


[12/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Remove test for old reduce view behavior

This test is no longer valid because its checking that a complete
message removes other workers from the same range which no longer
happens. This is because fabric_util:stream_start/2 now removes all
workers and returns us a single complete shard ring.

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/6462e847
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/6462e847
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/6462e847

Branch: refs/heads/windsor-merge
Commit: 6462e84765fefb31dee9ca9c2b9d48df2fa6000b
Parents: df9cb85
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Sep 4 16:23:02 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_reduce.erl | 15 ---------------
 1 file changed, 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/6462e847/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 42b889b..16d6fb1 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -124,21 +124,6 @@ handle_message(complete, Worker, #collector{counters = Counters0} = State) ->
     C1 = fabric_dict:update_counter(Worker, 1, Counters0),
     fabric_view:maybe_send_row(State#collector{counters = C1}).
 
-complete_worker_test() ->
-    meck:new(config),
-    meck:expect(config, get, fun("rexi","server_per_node",_) -> rexi_server end),
-    Shards =
-        mem3_util:create_partition_map("foo",3,3,[node(),node(),node()]),
-    Workers = lists:map(fun(#shard{} = Shard) ->
-                            Ref = make_ref(),
-                            Shard#shard{ref = Ref}
-                        end,
-                        Shards),
-    State = #collector{counters=fabric_dict:init(Workers,0)},
-    {ok, NewState} = handle_message(complete, lists:nth(2,Workers), State),
-    meck:unload(config),
-    ?assertEqual(orddict:size(NewState#collector.counters),length(Workers) - 2).
-
 os_proc_needed(<<"_", _/binary>>) -> false;
 os_proc_needed(_) -> true.
 


[10/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Make all timeouts configurable

BugzID: 24302


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/ec26cbab
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/ec26cbab
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/ec26cbab

Branch: refs/heads/windsor-merge
Commit: ec26cbabe767f95f1e67a5c0eb7ab11c7ce18206
Parents: a505676
Author: Robert Newson <ro...@cloudant.com>
Authored: Mon Oct 21 18:05:11 2013 +0100
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_doc_attachments.erl |  6 ++++--
 src/fabric_util.erl            | 13 +++++++++++--
 src/fabric_view_all_docs.erl   |  6 ++++--
 3 files changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/ec26cbab/src/fabric_doc_attachments.erl
----------------------------------------------------------------------
diff --git a/src/fabric_doc_attachments.erl b/src/fabric_doc_attachments.erl
index af56137..a703db9 100644
--- a/src/fabric_doc_attachments.erl
+++ b/src/fabric_doc_attachments.erl
@@ -34,11 +34,12 @@ receiver(Req, Length) when is_integer(Length) ->
     Middleman = spawn(fun() -> middleman(Req, Length) end),
     fun() ->
         Middleman ! {self(), gimme_data},
+        Timeout = fabric_util:attachments_timeout(),
         receive
             {Middleman, Data} ->
                 rexi:reply(attachment_chunk_received),
                 iolist_to_binary(Data)
-        after 600000 ->
+        after Timeout ->
             exit(timeout)
         end
     end;
@@ -64,6 +65,7 @@ maybe_send_continue(#httpd{mochi_req = MochiReq} = Req) ->
 
 write_chunks(MiddleMan, ChunkFun) ->
     MiddleMan ! {self(), gimme_data},
+    Timeout = fabric_util:attachments_timeout(),
     receive
     {MiddleMan, ChunkRecordList} ->
         rexi:reply(attachment_chunk_received),
@@ -71,7 +73,7 @@ write_chunks(MiddleMan, ChunkFun) ->
         continue -> write_chunks(MiddleMan, ChunkFun);
         done -> ok
         end
-    after 600000 ->
+    after Timeout ->
         exit(timeout)
     end.
 

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/ec26cbab/src/fabric_util.erl
----------------------------------------------------------------------
diff --git a/src/fabric_util.erl b/src/fabric_util.erl
index fb6cd8b..344ead7 100644
--- a/src/fabric_util.erl
+++ b/src/fabric_util.erl
@@ -15,7 +15,7 @@
 -export([submit_jobs/3, submit_jobs/4, cleanup/1, recv/4, get_db/1, get_db/2, error_info/1,
         update_counter/3, remove_ancestors/2, create_monitors/1, kv/2,
         remove_down_workers/2]).
--export([request_timeout/0]).
+-export([request_timeout/0, attachments_timeout/0, all_docs_timeout/0]).
 -export([stream_start/2, stream_start/4]).
 
 -include_lib("fabric/include/fabric.hrl").
@@ -138,7 +138,16 @@ recv(Workers, Keypos, Fun, Acc0) ->
     rexi_utils:recv(Workers, Keypos, Fun, Acc0, request_timeout(), infinity).
 
 request_timeout() ->
-    case config:get("fabric", "request_timeout", "60000") of
+    timeout("request", "60000").
+
+all_docs_timeout() ->
+    timeout("all_docs", "10000").
+
+attachments_timeout() ->
+    timeout("attachments", "600000").
+
+timeout(Type, Default) ->
+    case config:get("fabric", Type ++ "_timeout", Default) of
         "infinity" -> infinity;
         N -> list_to_integer(N)
     end.

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/ec26cbab/src/fabric_view_all_docs.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl
index 8dc9173..28a1f91 100644
--- a/src/fabric_view_all_docs.erl
+++ b/src/fabric_view_all_docs.erl
@@ -68,13 +68,14 @@ go(DbName, Options, QueryArgs, Callback, Acc0) ->
         true -> lists:sublist(Keys2, Limit);
         false -> Keys2
     end,
+    Timeout = fabric_util:all_docs_timeout(),
     receive {'DOWN', Ref0, _, _, {ok, TotalRows}} ->
         {ok, Acc1} = Callback({meta, [{total, TotalRows}]}, Acc0),
         {ok, Acc2} = doc_receive_loop(
             Keys3, queue:new(), SpawnFun, MaxJobs, Callback, Acc1
         ),
         Callback(complete, Acc2)
-    after 10000 ->
+    after Timeout ->
         Callback(timeout, Acc0)
     end.
 
@@ -176,6 +177,7 @@ doc_receive_loop(Keys, Pids, SpawnFun, MaxJobs, Callback, AccIn) ->
         doc_receive_loop(RKeys, Pids1, SpawnFun, MaxJobs, Callback, AccIn);
     _ ->
         {{value, {Pid, Ref}}, RestPids} = queue:out(Pids),
+        Timeout = fabric_util:all_docs_timeout(),
         receive {'DOWN', Ref, process, Pid, #view_row{} = Row} ->
             case Callback(fabric_view:transform_row(Row), AccIn) of
             {ok, Acc} ->
@@ -186,7 +188,7 @@ doc_receive_loop(Keys, Pids, SpawnFun, MaxJobs, Callback, AccIn) ->
                 cancel_read_pids(RestPids),
                 {ok, Acc}
             end
-        after 10000 ->
+        after Timeout ->
             timeout
         end
     end.


[45/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Log errors when doc reads fail for _all_docs

This makes sure that we log any errors while opening a doc during an
_all_docs request. It also passes the failed DbName and DocId to the
user supplied callback function.

BugzId: 24580


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/8edaa88b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/8edaa88b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/8edaa88b

Branch: refs/heads/windsor-merge
Commit: 8edaa88b79a4ed7f75e8385c812c62aeb04b30c9
Parents: af20699
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Dec 12 09:25:39 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_all_docs.erl | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/8edaa88b/src/fabric_view_all_docs.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl
index cda748a..4403146 100644
--- a/src/fabric_view_all_docs.erl
+++ b/src/fabric_view_all_docs.erl
@@ -204,7 +204,18 @@ doc_receive_loop(Keys, Pids, SpawnFun, MaxJobs, Callback, AccIn) ->
         end
     end.
 
+
 open_doc(DbName, Options, Id, IncludeDocs) ->
+    try
+        open_doc_int(DbName, Options, Id, IncludeDocs)
+    catch Type:Reason ->
+        Stack = erlang:get_stacktrace(),
+        couch_log:error("_all_docs open error: ~s ~s :: ~w ~w", [
+                DbName, Id, {Type, Reason}, Stack]),
+        exit({Id, Reason})
+    end.
+
+open_doc_int(DbName, Options, Id, IncludeDocs) ->
     Row = case fabric:open_doc(DbName, Id, [deleted | Options]) of
     {not_found, missing} ->
         Doc = undefined,


[36/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Refactor payload to simplify coordinator

Two things we're doing here:

1) We're unwrapping {doc, {error, Reason}} on the RPC side.
2) We're already suppressing the doc field on the RPC side so we don't
have to duplicate the work on the coordinator.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/161f0889
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/161f0889
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/161f0889

Branch: refs/heads/windsor-merge
Commit: 161f0889b003a8c72912797491ad3d6cf983519f
Parents: 5e65ee0
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Oct 17 14:03:47 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl          | 34 +++++++++-------------------------
 src/fabric_view_changes.erl | 26 ++++++++------------------
 2 files changed, 17 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/161f0889/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index c312aa7..cbc913e 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -306,43 +306,27 @@ changes_enumerator(DocInfo, {Db, _Seq, Args, Options}) ->
         filter = Acc
     } = Args,
     Conflicts = proplists:get_value(conflicts, Options, false),
-    #doc_info{high_seq=Seq, revs=[#rev_info{deleted=Del}|_]} = DocInfo,
+    #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del}|_]} = DocInfo,
     case [X || X <- couch_changes:filter(Db, DocInfo, Acc), X /= null] of
     [] ->
         {ok, {Db, Seq, Args, Options}};
     Results ->
         Opts = if Conflicts -> [conflicts]; true -> [] end,
-        ChangesRow = changes_row(Db, DocInfo, Results, Del, IncludeDocs, Opts),
+        ChangesRow = {change, [
+            {seq, {Seq, uuid(Db)}},
+            {id, Id},
+            {changes, Results},
+            {deleted, Del} |
+            if IncludeDocs -> [doc_member(Db, DocInfo, Opts)]; true -> [] end
+        ]},
         Go = rexi:sync_reply(ChangesRow),
         {Go, {Db, Seq, Args, Options}}
     end.
 
-changes_row(Db, #doc_info{id=Id, high_seq=Seq}=DI, Results, Del, true, Opts) ->
-    {change, [
-        {seq, {Seq, uuid(Db)}},
-        {id, Id},
-        {changes, Results},
-        {deleted, Del},
-        {doc, doc_member(Db, DI, Opts)}
-    ]};
-changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, true, _, _) ->
-    {change, [
-        {seq, {Seq, uuid(Db)}},
-        {id, Id},
-        {changes, Results},
-        {deleted, true}
-    ]};
-changes_row(Db, #doc_info{id=Id, high_seq=Seq}, Results, _, _, _) ->
-    {change, [
-        {seq, {Seq, uuid(Db)}},
-        {id, Id},
-        {changes, Results}
-    ]}.
-
 doc_member(Shard, DocInfo, Opts) ->
     case couch_db:open_doc(Shard, DocInfo, [deleted | Opts]) of
     {ok, Doc} ->
-        couch_doc:to_json_obj(Doc, []);
+        {doc, couch_doc:to_json_obj(Doc, [])};
     Error ->
         Error
     end.

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/161f0889/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index ed6dfb2..36cb945 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -224,7 +224,6 @@ handle_message(#change{} = Row, {Worker, From}, St) ->
 
 handle_message({change, Props}, {Worker, From}, St) ->
     #collector{
-        query_args = #changes_args{include_docs=IncludeDocs},
         callback = Callback,
         counters = S0,
         limit = Limit,
@@ -239,7 +238,7 @@ handle_message({change, Props}, {Worker, From}, St) ->
     true ->
         Props2 = lists:keyreplace(seq, 1, Props, {seq, null})
     end,
-    {Go, Acc} = Callback(changes_row(Props2, IncludeDocs), AccIn),
+    {Go, Acc} = Callback(changes_row(Props2), AccIn),
     rexi:stream_ack(From),
     {Go, St#collector{counters=S1, limit=Limit-1, user_acc=Acc}};
 
@@ -390,25 +389,16 @@ do_unpack_seqs(Opaque, DbName) ->
             Unpacked ++ [{R, 0} || R <- Replacements]
     end.
 
-changes_row(Props0, IncludeDocs) ->
-    Props1 = case {IncludeDocs, couch_util:get_value(doc, Props0)} of
-        {true, {error, Reason}} ->
-            % Transform {doc, {error, Reason}} to {error, Reason} for JSON
-            lists:keyreplace(doc, 1, Props0, {error, Reason});
-        {false, _} ->
-            lists:keydelete(doc, 1, Props0);
-        _ ->
-            Props0
-    end,
-    Props2 = case couch_util:get_value(deleted, Props1) of
+changes_row(Props0) ->
+    Props1 = case couch_util:get_value(deleted, Props0) of
         true ->
-            Props1;
+            Props0;
         _ ->
-            lists:keydelete(deleted, 1, Props1)
+            lists:keydelete(deleted, 1, Props0)
     end,
-    Allowed = [seq, id, changes, deleted, doc],
-    Props3 = lists:filter(fun({K,_V}) -> lists:member(K, Allowed) end, Props2),
-    {change, {Props3}}.
+    Allowed = [seq, id, changes, deleted, doc, error],
+    Props2 = lists:filter(fun({K,_V}) -> lists:member(K, Allowed) end, Props1),
+    {change, {Props2}}.
 
 find_replacement_shards(#shard{range=Range}, AllShards) ->
     % TODO make this moar betta -- we might have split or merged the partition


[48/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Don't attempt to replace replacement shards

The old test was looking for {_, _} tuple but that doesn't account for
old since sequences. This just switches to using a negative selection.
The only bad behavior was that we were making lots of log messages about
rewinding from 0 in known conditions.

BugzId: 26125


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/1fced48b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/1fced48b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/1fced48b

Branch: refs/heads/windsor-merge
Commit: 1fced48beeeb7a4ed7b29bb318d747798b2ffe47
Parents: 2e98455
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Dec 12 12:58:35 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1fced48b/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 480f6bb..8de01f1 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -142,10 +142,10 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
     StartFun = fun(#shard{name=Name, node=N, range=R0}=Shard) ->
         %% Find the original shard copy in the Seqs array
         case lists:dropwhile(fun({S, _}) -> S#shard.range =/= R0 end, Seqs) of
-            % The {_, _}=OldSeq pattern match is so that we don't
-            % accidentally try and replace based on the generated
-            % {replace, _, _, _} tuples.
-            [{#shard{node = OldNode}, {_, _}=OldSeq} | _] ->
+            [{#shard{}, {replace, _, _, _}} | _] ->
+                % Don't attempt to replace a replacement
+                SeqArg = 0;
+            [{#shard{node = OldNode}, OldSeq} | _] ->
                 SeqArg = make_replacement_arg(OldNode, OldSeq);
             _ ->
                 % TODO this clause is probably unreachable in the N>2


[18/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Set io_priority properly for all requests

Notably, changes wasn't calling set_io_priority so was not affected by
maintenance_mode.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/5731bf3a
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/5731bf3a
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/5731bf3a

Branch: refs/heads/windsor-merge
Commit: 5731bf3a0ad811ec64505d9030c738a16600c473
Parents: c0d13fb
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Oct 4 16:35:13 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_rpc.erl | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/5731bf3a/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 7f9b88b..d61da40 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -37,7 +37,7 @@ changes(DbName, Args, StartSeq) ->
 changes(DbName, #changes_args{} = Args, StartSeq, DbOptions) ->
     changes(DbName, [Args], StartSeq, DbOptions);
 changes(DbName, Options, StartVector, DbOptions) ->
-    erlang:put(io_priority, {interactive, DbName}),
+    set_io_priority(DbName, DbOptions),
     #changes_args{dir=Dir} = Args = lists:keyfind(changes_args, 1, Options),
     case get_or_create_db(DbName, DbOptions) of
     {ok, Db} ->
@@ -57,6 +57,7 @@ changes(DbName, Options, StartVector, DbOptions) ->
     end.
 
 all_docs(DbName, Options, #mrargs{keys=undefined} = Args0) ->
+    set_io_priority(DbName, Options),
     Args = fix_skip_and_limit(Args0),
     {ok, Db} = get_or_create_db(DbName, Options),
     VAcc0 = #vacc{db=Db},
@@ -67,6 +68,7 @@ map_view(DbName, DDoc, ViewName, Args0) ->
     map_view(DbName, DDoc, ViewName, Args0, []).
 
 map_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
+    set_io_priority(DbName, DbOptions),
     Args = fix_skip_and_limit(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
     VAcc0 = #vacc{db=Db},
@@ -77,6 +79,7 @@ reduce_view(DbName, DDoc, ViewName, Args0) ->
     reduce_view(DbName, DDoc, ViewName, Args0, []).
 
 reduce_view(DbName, DDoc, ViewName, Args0, DbOptions) ->
+    set_io_priority(DbName, DbOptions),
     Args = fix_skip_and_limit(Args0),
     {ok, Db} = get_or_create_db(DbName, DbOptions),
     VAcc0 = #vacc{db=Db},


[47/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Fix fabric_db_update_listener code upgrades

Changes procesess waiting idle for db update messages will never upgrade
their code. This just adds a timeout before recursing through the module
exports table to load new code.

BugzId: 27660


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/63ec0f91
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/63ec0f91
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/63ec0f91

Branch: refs/heads/windsor-merge
Commit: 63ec0f91a1d5069c255aeeda0e2efbb251bc59e3
Parents: 74e4a79
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Jan 30 18:40:04 2014 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_db_update_listener.erl | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/63ec0f91/src/fabric_db_update_listener.erl
----------------------------------------------------------------------
diff --git a/src/fabric_db_update_listener.erl b/src/fabric_db_update_listener.erl
index 9a0f2c6..ab69491 100644
--- a/src/fabric_db_update_listener.erl
+++ b/src/fabric_db_update_listener.erl
@@ -115,6 +115,8 @@ wait_db_updated({Pid, Ref}) ->
             State;
         {'DOWN', MonRef, process, Pid, Reason} ->
             throw({changes_feed_died, Reason})
+    after 300000 ->
+        ?MODULE:wait_db_updated({Pid, Ref})
     end.
 
 receive_results(Workers, Acc0, Timeout) ->


[27/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Report errors opening documents during _all_docs

All errors are currently ignored in the receive statements and
eventually we timeout.  This patch causes fabric to report the error and
terminate quickly instead of waiting.

BugzID: 24580


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/5e2d376c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/5e2d376c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/5e2d376c

Branch: refs/heads/windsor-merge
Commit: 5e2d376c3ca19f42faeab66aa432f7b3d17955b4
Parents: 8e80f22
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Fri Oct 25 22:59:23 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_all_docs.erl | 39 +++++++++++++++++++++++++--------------
 1 file changed, 25 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/5e2d376c/src/fabric_view_all_docs.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_all_docs.erl b/src/fabric_view_all_docs.erl
index 28a1f91..cda748a 100644
--- a/src/fabric_view_all_docs.erl
+++ b/src/fabric_view_all_docs.erl
@@ -69,12 +69,17 @@ go(DbName, Options, QueryArgs, Callback, Acc0) ->
         false -> Keys2
     end,
     Timeout = fabric_util:all_docs_timeout(),
-    receive {'DOWN', Ref0, _, _, {ok, TotalRows}} ->
-        {ok, Acc1} = Callback({meta, [{total, TotalRows}]}, Acc0),
-        {ok, Acc2} = doc_receive_loop(
-            Keys3, queue:new(), SpawnFun, MaxJobs, Callback, Acc1
-        ),
-        Callback(complete, Acc2)
+    receive {'DOWN', Ref0, _, _, Result} ->
+        case Result of
+            {ok, TotalRows} ->
+                {ok, Acc1} = Callback({meta, [{total, TotalRows}]}, Acc0),
+                {ok, Acc2} = doc_receive_loop(
+                    Keys3, queue:new(), SpawnFun, MaxJobs, Callback, Acc1
+                ),
+                Callback(complete, Acc2);
+            Error ->
+                Callback({error, Error}, Acc0)
+        end
     after Timeout ->
         Callback(timeout, Acc0)
     end.
@@ -178,15 +183,21 @@ doc_receive_loop(Keys, Pids, SpawnFun, MaxJobs, Callback, AccIn) ->
     _ ->
         {{value, {Pid, Ref}}, RestPids} = queue:out(Pids),
         Timeout = fabric_util:all_docs_timeout(),
-        receive {'DOWN', Ref, process, Pid, #view_row{} = Row} ->
-            case Callback(fabric_view:transform_row(Row), AccIn) of
-            {ok, Acc} ->
-                doc_receive_loop(
-                    Keys, RestPids, SpawnFun, MaxJobs, Callback, Acc
-                );
-            {stop, Acc} ->
+        receive {'DOWN', Ref, process, Pid, Row} ->
+            case Row of
+            #view_row{} ->
+                case Callback(fabric_view:transform_row(Row), AccIn) of
+                {ok, Acc} ->
+                    doc_receive_loop(
+                        Keys, RestPids, SpawnFun, MaxJobs, Callback, Acc
+                    );
+                {stop, Acc} ->
+                    cancel_read_pids(RestPids),
+                    {ok, Acc}
+                end;
+            Error ->
                 cancel_read_pids(RestPids),
-                {ok, Acc}
+                Callback({error, Error}, AccIn)
             end
         after Timeout ->
             timeout


[37/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Throw errors when starting changes streams

Previously we were calling the user supplied callback on these errors
which caused a badmatch in keep_sending_changes. This just throws the
error and lets chttpd handle formatting it for the user.

BugzId: 26122


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/4d9106e8
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/4d9106e8
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/4d9106e8

Branch: refs/heads/windsor-merge
Commit: 4d9106e81176fffa0db7960048229c30b474b3dc
Parents: 8edaa88
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Dec 12 12:45:10 2013 -0600
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/4d9106e8/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 1797539..480f6bb 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -174,8 +174,10 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
                 after
                     fabric_util:cleanup(Workers)
                 end;
+            {error, Reason} ->
+                throw({error, Reason});
             Else ->
-                Callback(Else, AccIn)
+                throw({error, Else})
         end
     after
         rexi_monitor:stop(RexiMon)


[39/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Avoid badarith error during cluster upgrade

BugzID: 26631


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/2e77350d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/2e77350d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/2e77350d

Branch: refs/heads/windsor-merge
Commit: 2e77350def9ba1ec33b814c06411b0c125de7d8c
Parents: dbcbfc1
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Mon Jan 6 12:02:18 2014 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:43 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/2e77350d/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 5d0672e..c4eab22 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -223,7 +223,7 @@ handle_message({change, Props}, {Worker, _}, #collector{limit=0} = State) ->
     O1 = case fabric_dict:lookup_element(Worker, O0) of
         null ->
             % Use Pending+1 because we're ignoring this row in the response
-            Pending = couch_util:get_value(pending, Props),
+            Pending = couch_util:get_value(pending, Props, 0),
             fabric_dict:store(Worker, Pending+1, O0);
         _ ->
             O0


[28/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Switch from ddoc to id+rev protocol for views


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/8e80f227
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/8e80f227
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/8e80f227

Branch: refs/heads/windsor-merge
Commit: 8e80f2273d09b88ac14f9ef7e2214801d568baaf
Parents: e8cf1bb
Author: Brian Mitchell <br...@p2p.io>
Authored: Fri Oct 25 11:44:00 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:42 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_map.erl    | 2 +-
 src/fabric_view_reduce.erl | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/8e80f227/src/fabric_view_map.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index d2c010a..1201daf 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -26,7 +26,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
 go(DbName, DDoc, View, Args, Callback, Acc) ->
     Shards = fabric_view:get_shards(DbName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
-    RPCArgs = [DDoc, View, Args],
+    RPCArgs = [fabric_util:doc_id_and_rev(DDoc), View, Args],
     StartFun = fun(Shard) ->
         hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
     end,

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/8e80f227/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 67c36a3..2e0d1f2 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -25,7 +25,7 @@ go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -
 
 go(DbName, DDoc, VName, Args, Callback, Acc, {red, {_, Lang, _}, _}=VInfo) ->
     RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
-    RPCArgs = [DDoc, VName, Args],
+    RPCArgs = [fabric_util:doc_id_and_rev(DDoc), VName, Args],
     Shards = fabric_view:get_shards(DbName, Args),
     Repls = fabric_view:get_shard_replacements(DbName, Shards),
     StartFun = fun(Shard) ->


[07/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
[1/2] Use proplist instead of #change record

This commit lays the groundwork by translating the record into a
proplist.  A follow-on commit will cause the RPC workers to send the
proplist instead of the record.

It also converts the 'complete' tuple into a tagged proplist.

BugzID: 24236


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/865b5555
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/865b5555
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/865b5555

Branch: refs/heads/windsor-merge
Commit: 865b5555771e6099c9b34b1b14d2428ce02e50c3
Parents: cb388f2
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Thu Oct 17 11:45:08 2013 -0400
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 56 ++++++++++++++++++++++++++++------------
 1 file changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/865b5555/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 19824bc..479c32b 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -189,10 +189,24 @@ handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
     fabric_view:handle_worker_exit(State, Worker, Reason);
 
+% Temporary upgrade clause - Case 24236
+handle_message({complete, Key}, Worker, State) when is_tuple(Key) ->
+    handle_message({complete, [{seq, Key}]}, Worker, State);
+
 handle_message(_, _, #collector{limit=0} = State) ->
     {stop, State};
 
-handle_message(#change{key=Key} = Row0, {Worker, From}, St) ->
+handle_message(#change{} = Row, {Worker, From}, St) ->
+    Change = {change, [
+        {seq, Row#change.key},
+        {id, Row#change.id},
+        {changes, Row#change.value},
+        {deleted, Row#change.deleted},
+        {doc, Row#change.doc}
+    ]},
+    handle_message(Change, {Worker, From}, St);
+
+handle_message({change, Props}, {Worker, From}, St) ->
     #collector{
         query_args = #changes_args{include_docs=IncludeDocs},
         callback = Callback,
@@ -201,15 +215,15 @@ handle_message(#change{key=Key} = Row0, {Worker, From}, St) ->
         user_acc = AccIn
     } = St,
     true = fabric_dict:is_key(Worker, S0),
-    S1 = fabric_dict:store(Worker, Key, S0),
+    S1 = fabric_dict:store(Worker, couch_util:get_value(seq, Props), S0),
     % Temporary hack for FB 23637
     Interval = erlang:get(changes_seq_interval),
     if (Interval == undefined) orelse (Limit rem Interval == 0) ->
-        Row = Row0#change{key = pack_seqs(S1)};
+        Props2 = lists:keyreplace(seq, 1, Props, {seq, pack_seqs(S1)});
     true ->
-        Row = Row0#change{key = null}
+        Props2 = lists:keyreplace(seq, 1, Props, {seq, null})
     end,
-    {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
+    {Go, Acc} = Callback(changes_row(Props2, IncludeDocs), AccIn),
     rexi:stream_ack(From),
     {Go, St#collector{counters=S1, limit=Limit-1, user_acc=Acc}};
 
@@ -220,7 +234,8 @@ handle_message({no_pass, Seq}, {Worker, From}, St) ->
     rexi:stream_ack(From),
     {ok, St#collector{counters=S1}};
 
-handle_message({complete, Key}, Worker, State) ->
+handle_message({complete, Props}, Worker, State) ->
+    Key = couch_util:get_value(seq, Props),
     #collector{
         counters = S0,
         total_rows = Completed % override
@@ -350,16 +365,25 @@ do_unpack_seqs(Opaque, DbName) ->
             Unpacked ++ [{R, 0} || R <- Replacements]
     end.
 
-changes_row(#change{key=Seq, id=Id, value=Value, deleted=true, doc=Doc}, true) ->
-    {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}, {doc, Doc}]}};
-changes_row(#change{key=Seq, id=Id, value=Value, deleted=true}, false) ->
-    {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {deleted, true}]}};
-changes_row(#change{key=Seq, id=Id, value=Value, doc={error,Reason}}, true) ->
-    {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {error,Reason}]}};
-changes_row(#change{key=Seq, id=Id, value=Value, doc=Doc}, true) ->
-    {change, {[{seq,Seq}, {id,Id}, {changes,Value}, {doc,Doc}]}};
-changes_row(#change{key=Seq, id=Id, value=Value}, false) ->
-    {change, {[{seq,Seq}, {id,Id}, {changes,Value}]}}.
+changes_row(Props0, IncludeDocs) ->
+    Props1 = case {IncludeDocs, couch_util:get_value(doc, Props0)} of
+        {true, {error, Reason}} ->
+            % Transform {doc, {error, Reason}} to {error, Reason} for JSON
+            lists:keyreplace(doc, 1, Props0, {error, Reason});
+        {false, _} ->
+            lists:keydelete(doc, 1, Props0);
+        _ ->
+            Props0
+    end,
+    Props2 = case couch_util:get_value(deleted, Props1) of
+        true ->
+            Props1;
+        _ ->
+            lists:keydelete(deleted, 1, Props1)
+    end,
+    Allowed = [seq, id, changes, deleted, doc],
+    Props3 = lists:filter(fun({K,_V}) -> lists:member(K, Allowed) end, Props0),
+    {change, {Props3}}.
 
 find_replacement_shards(#shard{range=Range}, AllShards) ->
     % TODO make this moar betta -- we might have split or merged the partition


[20/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Provide replacement shards in map views

BugzId: 20423


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/bc57da8b
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/bc57da8b
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/bc57da8b

Branch: refs/heads/windsor-merge
Commit: bc57da8bb34ed84c4b6e056fd6df7227b2f6b036
Parents: 45bee39
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Sep 12 10:22:00 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_map.erl | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/bc57da8b/src/fabric_view_map.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_map.erl b/src/fabric_view_map.erl
index cf70568..d2c010a 100644
--- a/src/fabric_view_map.erl
+++ b/src/fabric_view_map.erl
@@ -25,11 +25,15 @@ go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
 
 go(DbName, DDoc, View, Args, Callback, Acc) ->
     Shards = fabric_view:get_shards(DbName, Args),
-    Workers0 = fabric_util:submit_jobs(
-            Shards, fabric_rpc, map_view, [DDoc, View, Args]),
+    Repls = fabric_view:get_shard_replacements(DbName, Shards),
+    RPCArgs = [DDoc, View, Args],
+    StartFun = fun(Shard) ->
+        hd(fabric_util:submit_jobs([Shard], fabric_rpc, map_view, RPCArgs))
+    end,
+    Workers0 = fabric_util:submit_jobs(Shards, fabric_rpc, map_view, RPCArgs),
     RexiMon = fabric_util:create_monitors(Workers0),
     try
-        case fabric_util:stream_start(Workers0, #shard.ref) of
+        case fabric_util:stream_start(Workers0, #shard.ref, StartFun, Repls) of
             {ok, Workers} ->
                 try
                     go(DbName, Workers, Args, Callback, Acc)
@@ -74,11 +78,6 @@ handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
     fabric_view:handle_worker_exit(State, Worker, Reason);
 
-handle_message({rexi_EXIT, Reason}, _, State) ->
-    #collector{callback=Callback, user_acc=Acc} = State,
-    {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-    {error, Resp};
-
 handle_message({meta, Meta0}, {Worker, From}, State) ->
     Tot = couch_util:get_value(total, Meta0, 0),
     Off = couch_util:get_value(offset, Meta0, 0),


[21/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Add utility function to find shard replacements

Streaming RPC coordinators can use this function to generate a list of
shard replacements that is suitable to be passed to
fabric_util:stream_start/4.

BugzId: 20423


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/45bee39d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/45bee39d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/45bee39d

Branch: refs/heads/windsor-merge
Commit: 45bee39d8cc809c709bf52f7e90b7440e31e75ed
Parents: f225697
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Sep 12 13:30:40 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_view.erl | 31 ++++++++++++++++++++++++++++++-
 1 file changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/45bee39d/src/fabric_view.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
index 690625b..7cd5321 100644
--- a/src/fabric_view.erl
+++ b/src/fabric_view.erl
@@ -14,7 +14,8 @@
 
 -export([is_progress_possible/1, remove_overlapping_shards/2, maybe_send_row/1,
     transform_row/1, keydict/1, extract_view/4, get_shards/2,
-    check_down_shards/2, handle_worker_exit/3]).
+    check_down_shards/2, handle_worker_exit/3,
+    get_shard_replacements/2]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
@@ -305,6 +306,34 @@ get_shards(DbName, #mrargs{stale=Stale})
 get_shards(DbName, #mrargs{stale=false}) ->
     mem3:shards(DbName).
 
+get_shard_replacements(DbName, UsedShards) ->
+    % We only want to generate a replacements list from shards
+    % that aren't already used.
+    AllLiveShards = mem3:live_shards(DbName, [node() | nodes()]),
+    UnusedShards = AllLiveShards -- UsedShards,
+
+    % If we have more than one copy of a range then we don't
+    % want to try and add a replacement to any copy.
+    RangeCounts = lists:foldl(fun(#shard{range=R}, Acc) ->
+        dict:update_counter(R, 1, Acc)
+    end, dict:new(), UsedShards),
+
+    % For each seq shard range with a count of 1, find any
+    % possible replacements from the unused shards. The
+    % replacement list is keyed by range.
+    lists:foldl(fun(#shard{range=Range}, Acc) ->
+        case dict:find(Range, RangeCounts) of
+            {ok, 1} ->
+                Repls = [S || S <- UnusedShards, S#shard.range =:= Range],
+                % Only keep non-empty lists of replacements
+                if Repls == [] -> Acc; true ->
+                    [{Range, Repls} | Acc]
+                end;
+            _ ->
+                Acc
+        end
+    end, [], UsedShards).
+
 % unit test
 is_progress_possible_test() ->
     EndPoint = 2 bsl 31,


[15/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Update _changes coordinator to use new RPC APIs

BugzId: 21755


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/c0d13fb6
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/c0d13fb6
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/c0d13fb6

Branch: refs/heads/windsor-merge
Commit: c0d13fb66b02763d73e420bcb1b6cf8ab492319c
Parents: 6462e84
Author: Paul J. Davis <pa...@gmail.com>
Authored: Wed Sep 4 15:59:39 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_view_changes.erl | 155 +++++++++++++++------------------------
 1 file changed, 59 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/c0d13fb6/src/fabric_view_changes.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl
index 8d162ce..69d9d09 100644
--- a/src/fabric_view_changes.erl
+++ b/src/fabric_view_changes.erl
@@ -130,9 +130,30 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
             end, find_replacement_shards(Shard, AllLiveShards))
         end
     end, unpack_seqs(PackedSeqs, DbName)),
-    {Workers, _} = lists:unzip(Seqs),
-    RexiMon = fabric_util:create_monitors(Workers),
+    {Workers0, _} = lists:unzip(Seqs),
+    RexiMon = fabric_util:create_monitors(Workers0),
+    try
+        case fabric_util:stream_start(Workers0, #shard.ref) of
+            {ok, Workers} ->
+                try
+                    LiveSeqs = lists:filter(fun({W, _S}) ->
+                        lists:member(W, Workers)
+                    end, Seqs),
+                    send_changes(DbName, Workers, LiveSeqs, ChangesArgs,
+                            Callback, AccIn, Timeout)
+                after
+                    fabric_util:cleanup(Workers)
+                end;
+            Else ->
+                Callback(Else, AccIn)
+        end
+    after
+        rexi_monitor:stop(RexiMon)
+    end.
+
+send_changes(DbName, Workers, Seqs, ChangesArgs, Callback, AccIn, Timeout) ->
     State = #collector{
+        db_name = DbName,
         query_args = ChangesArgs,
         callback = Callback,
         counters = orddict:from_list(Seqs),
@@ -141,12 +162,7 @@ send_changes(DbName, ChangesArgs, Callback, PackedSeqs, AccIn, Timeout) ->
         rows = Seqs % store sequence positions instead
     },
     %% TODO: errors need to be handled here
-    try
-        receive_results(Workers, State, Timeout, Callback)
-    after
-        rexi_monitor:stop(RexiMon),
-        fabric_util:cleanup(Workers)
-    end.
+    receive_results(Workers, State, Timeout, Callback).
 
 receive_results(Workers, State, Timeout, Callback) ->
     case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State,
@@ -159,25 +175,11 @@ receive_results(Workers, State, Timeout, Callback) ->
         {ok, NewState}
     end.
 
-handle_message({rexi_DOWN, _, {_, NodeRef}, _}, nil, State) ->
-    fabric_view:remove_down_shards(State, NodeRef);
+handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
+    fabric_view:check_down_shards(State, NodeRef);
 
 handle_message({rexi_EXIT, Reason}, Worker, State) ->
-    #collector{
-        callback=Callback,
-        counters=Counters0,
-        rows = Seqs0,
-        user_acc=Acc
-    } = State,
-    Counters = fabric_dict:erase(Worker, Counters0),
-    Seqs = fabric_dict:erase(Worker, Seqs0),
-    case fabric_view:is_progress_possible(Counters) of
-    true ->
-        {ok, State#collector{counters = Counters, rows=Seqs}};
-    false ->
-        {ok, Resp} = Callback({error, fabric_util:error_info(Reason)}, Acc),
-        {error, Resp}
-    end;
+    fabric_view:handle_worker_exit(State, Worker, Reason);
 
 handle_message(_, _, #collector{limit=0} = State) ->
     {stop, State};
@@ -190,84 +192,45 @@ handle_message(#change{key=Key} = Row0, {Worker, From}, St) ->
         limit = Limit,
         user_acc = AccIn
     } = St,
-    case fabric_dict:lookup_element(Worker, S0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate it
-        gen_server:reply(From, stop),
-        {ok, St};
-    _ ->
-        S1 = fabric_dict:store(Worker, Key, S0),
-        S2 = fabric_view:remove_overlapping_shards(Worker, S1),
-        % this check should not be necessary at all, as holes in the ranges
-        % created from DOWN messages would have led to errors
-        case fabric_view:is_progress_possible(S2) of
-        true ->
-            % Temporary hack for FB 23637
-            Interval = erlang:get(changes_seq_interval),
-            if (Interval == undefined) orelse (Limit rem Interval == 0) ->
-                Row = Row0#change{key = pack_seqs(S2)};
-            true ->
-                Row = Row0#change{key = null}
-            end,
-            {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
-            gen_server:reply(From, Go),
-            {Go, St#collector{counters=S2, limit=Limit-1, user_acc=Acc}};
-        false ->
-            Reason = {range_not_covered, <<"progress not possible">>},
-            Callback({error, Reason}, AccIn),
-            gen_server:reply(From, stop),
-            {stop, St#collector{counters=S2}}
-        end
-    end;
+    true = fabric_dict:is_key(Worker, S0),
+    S1 = fabric_dict:store(Worker, Key, S0),
+    % Temporary hack for FB 23637
+    Interval = erlang:get(changes_seq_interval),
+    if (Interval == undefined) orelse (Limit rem Interval == 0) ->
+        Row = Row0#change{key = pack_seqs(S1)};
+    true ->
+        Row = Row0#change{key = null}
+    end,
+    {Go, Acc} = Callback(changes_row(Row, IncludeDocs), AccIn),
+    rexi:stream_ack(From),
+    {Go, St#collector{counters=S1, limit=Limit-1, user_acc=Acc}};
 
 handle_message({no_pass, Seq}, {Worker, From}, St) ->
-    #collector{
-        counters = S0
-    } = St,
-    case fabric_dict:lookup_element(Worker, S0) of
-    undefined ->
-        % this worker lost the race with other partition copies, terminate it
-        gen_server:reply(From, stop),
-        {ok, St};
-    _ ->
-        S1 = fabric_dict:store(Worker, Seq, S0),
-        S2 = fabric_view:remove_overlapping_shards(Worker, S1),
-        gen_server:reply(From, ok),
-        {ok, St#collector{counters=S2}}
-    end;
+    #collector{counters = S0} = St,
+    true = fabric_dict:is_key(Worker, S0),
+    S1 = fabric_dict:store(Worker, Seq, S0),
+    rexi:stream_ack(From),
+    {ok, St#collector{counters=S1}};
 
 handle_message({complete, Key}, Worker, State) ->
     #collector{
-        callback = Callback,
         counters = S0,
-        total_rows = Completed, % override
-        user_acc = Acc
+        total_rows = Completed % override
     } = State,
-    case fabric_dict:lookup_element(Worker, S0) of
-    undefined ->
-        {ok, State};
-    _ ->
-        S1 = fabric_dict:store(Worker, Key, S0),
-        % unlikely to have overlaps here, but possible w/ filters
-        S2 = fabric_view:remove_overlapping_shards(Worker, S1),
-        NewState = State#collector{counters=S2, total_rows=Completed+1},
-        case fabric_dict:size(S2) =:= (Completed+1) of
-        true ->
-            % check ranges are covered, again this should not be neccessary
-            % as any holes in the ranges due to DOWN messages would have errored
-            % out sooner
-            case fabric_view:is_progress_possible(S2) of
-            true ->
-                {stop, NewState};
-            false ->
-                Reason = {range_not_covered, <<"progress not possible">>},
-                Callback({error, Reason}, Acc),
-                {stop, NewState}
-            end;
-        false ->
-            {ok, NewState}
-        end
-    end.
+    true = fabric_dict:is_key(Worker, S0),
+    S1 = fabric_dict:store(Worker, Key, S0),
+    NewState = State#collector{counters=S1, total_rows=Completed+1},
+    % We're relying on S1 having exactly the numnber of workers that
+    % are participtaing in this response. With the new stream_start
+    % that's a bit more obvious but historically it wasn't quite
+    % so clear. The Completed variable is just a hacky override
+    % of the total_rows field in the #collector{} record.
+    NumWorkers = fabric_dict:size(S1),
+    Go = case NumWorkers =:= (Completed+1) of
+        true -> stop;
+        false -> ok
+    end,
+    {Go, NewState}.
 
 make_changes_args(#changes_args{style=Style, filter=undefined}=Args) ->
     Args#changes_args{filter = Style};


[06/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Add fabric_dict:fetch_keys/1


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/28a9e3df
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/28a9e3df
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/28a9e3df

Branch: refs/heads/windsor-merge
Commit: 28a9e3df6ed981b86935bdf79a9444ea6fcf8976
Parents: 3222511
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Oct 4 13:16:03 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:40 2014 +0100

----------------------------------------------------------------------
 src/fabric_dict.erl | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/28a9e3df/src/fabric_dict.erl
----------------------------------------------------------------------
diff --git a/src/fabric_dict.erl b/src/fabric_dict.erl
index 20700c0..f88ca97 100644
--- a/src/fabric_dict.erl
+++ b/src/fabric_dict.erl
@@ -22,6 +22,9 @@ init(Keys, InitialValue) ->
 is_key(Key, Dict) ->
     orddict:is_key(Key, Dict).
 
+fetch_keys(Dict) ->
+    orddict:fetch_keys(Dict).
+
 decrement_all(Dict) ->
     [{K,V-1} || {K,V} <- Dict].
 


[19/49] fabric commit: updated refs/heads/windsor-merge to b1c0030

Posted by rn...@apache.org.
Fix `fabric:get_db_info/1`

We weren't properly removing shards that had an error as well as
returning errors to functions that don't expect them.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/0f454787
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/0f454787
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/0f454787

Branch: refs/heads/windsor-merge
Commit: 0f45478727019daec3131441eaac8ec9b0bba14e
Parents: 5731bf3
Author: Paul J. Davis <pa...@gmail.com>
Authored: Fri Oct 4 16:34:11 2013 -0500
Committer: Robert Newson <rn...@apache.org>
Committed: Fri Aug 1 15:33:41 2014 +0100

----------------------------------------------------------------------
 src/fabric_db_info.erl | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/0f454787/src/fabric_db_info.erl
----------------------------------------------------------------------
diff --git a/src/fabric_db_info.erl b/src/fabric_db_info.erl
index 58139e8..de88632 100644
--- a/src/fabric_db_info.erl
+++ b/src/fabric_db_info.erl
@@ -21,9 +21,13 @@ go(DbName) ->
     Shards = mem3:shards(DbName),
     Workers = fabric_util:submit_jobs(Shards, get_db_info, []),
     RexiMon = fabric_util:create_monitors(Shards),
+    Fun = fun handle_message/3,
     Acc0 = {fabric_dict:init(Workers, nil), []},
     try
-        fabric_util:recv(Workers, #shard.ref, fun handle_message/3, Acc0)
+        case fabric_util:recv(Workers, #shard.ref, Fun, Acc0) of
+            {ok, Acc} -> {ok, Acc};
+            {error, Error} -> throw(Error)
+        end
     after
         rexi_monitor:stop(RexiMon)
     end.
@@ -37,7 +41,7 @@ handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Shard, {Counters, Acc}) ->
     end;
 
 handle_message({rexi_EXIT, Reason}, Shard, {Counters, Acc}) ->
-    NewCounters = lists:keydelete(Shard, #shard.ref, Counters),
+    NewCounters = fabric_dict:erase(Shard, Counters),
     case fabric_view:is_progress_possible(NewCounters) of
     true ->
         {ok, {NewCounters, Acc}};