You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/08/14 20:12:46 UTC
fabric commit: updated refs/heads/windsor-merge to ac81980
Repository: couchdb-fabric
Updated Branches:
refs/heads/windsor-merge 094ce20f6 -> ac8198092
Fix use of the rexi:stream2 API
Bit of a mixup during the merge on this one as it included both the
switch to stream2 as well as the switch to using couch_mrview.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/ac819809
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/ac819809
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/ac819809
Branch: refs/heads/windsor-merge
Commit: ac8198092a1d23402c81e74607d8a0f613131c0f
Parents: 094ce20
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Aug 14 13:12:15 2014 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Aug 14 13:12:15 2014 -0500
----------------------------------------------------------------------
src/fabric_rpc.erl | 64 ++++++++++-------------------------------
src/fabric_view_reduce.erl | 3 +-
2 files changed, 17 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/ac819809/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index fd5b3a7..17dc19b 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -253,14 +253,8 @@ get_or_create_db(DbName, Options) ->
view_cb({meta, Meta}, Acc) ->
% Map function starting
- case rexi:stream2({meta, Meta}) of
- ok ->
- {ok, Acc};
- stop ->
- exit(normal);
- timeout ->
- exit(timeout)
- end;
+ ok = rexi:stream2({meta, Meta}),
+ {ok, Acc};
view_cb({row, Row}, Acc) ->
% Adding another row
ViewRow = #view_row{
@@ -269,59 +263,31 @@ view_cb({row, Row}, Acc) ->
value = couch_util:get_value(value, Row),
doc = couch_util:get_value(doc, Row)
},
- case rexi:stream2(ViewRow) of
- ok ->
- {ok, Acc};
- timeout ->
- exit(timeout)
- end;
+ ok = rexi:stream2(ViewRow),
+ {ok, Acc};
view_cb(complete, Acc) ->
% Finish view output
- rexi:reply(complete),
+ ok = rexi:stream_last(complete),
{ok, Acc}.
reduce_cb({meta, Meta}, Acc) ->
% Map function starting
- case rexi:sync_reply({meta, Meta}) of
- ok ->
- {ok, Acc};
- stop ->
- exit(normal);
- timeout ->
- exit(timeout)
- end;
+ ok = rexi:stream2({meta, Meta}),
+ {ok, Acc};
reduce_cb({row, Row}, Acc) ->
% Adding another row
- Key = couch_util:get_value(key, Row),
- Value = couch_util:get_value(value, Row),
- send(Key, Value, Acc);
+ ok = rexi:stream2(#view_row{
+ key = couch_util:get_value(key, Row),
+ value = couch_util:get_value(value, Row)
+ }),
+ {ok, Acc};
reduce_cb(complete, Acc) ->
% Finish view output
- rexi:reply(complete),
+ ok = rexi:stream_last(complete),
{ok, Acc}.
-send(Key, Value, Acc) ->
- case put(fabric_sent_first_row, true) of
- undefined ->
- case rexi:stream2(#view_row{key=Key, value=Value}) of
- ok ->
- {ok, Acc};
- stop ->
- exit(normal);
- timeout ->
- exit(timeout)
- end;
- true ->
- case rexi:stream2(#view_row{key=Key, value=Value}) of
- ok ->
- {ok, Acc};
- timeout ->
- exit(timeout)
- end
- end.
-
changes_enumerator(#doc_info{id= <<"_local/", _/binary>>, high_seq=Seq}, Acc) ->
{ok, Acc#cacc{seq = Seq, pending = Acc#cacc.pending-1}};
changes_enumerator(DocInfo, Acc) ->
@@ -347,8 +313,8 @@ changes_enumerator(DocInfo, Acc) ->
{deleted, Del} |
if IncludeDocs -> [doc_member(Db, DocInfo, Opts)]; true -> [] end
]},
- Go = rexi:stream2(ChangesRow),
- {Go, Acc#cacc{seq = Seq, pending = Pending-1}}
+ ok = rexi:stream2(ChangesRow),
+ {ok, Acc#cacc{seq = Seq, pending = Pending-1}}
end.
doc_member(Shard, DocInfo, Opts) ->
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/ac819809/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 583c8ff..8fb5b0b 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -107,7 +107,8 @@ handle_message({rexi_EXIT, Reason}, Worker, State) ->
%% message as a clean way to indicate to couch_mrview_http:view_cb that the
%% reduce response is starting.
handle_message({meta, Meta}, {_Worker, From}, State) ->
- gen_server:reply(From, ok),
+ rexi:stream_ack(From),
+
#collector{
callback = Callback,
user_acc = AccIn