You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/08/14 20:12:46 UTC

fabric commit: updated refs/heads/windsor-merge to ac81980

Repository: couchdb-fabric
Updated Branches:
  refs/heads/windsor-merge 094ce20f6 -> ac8198092


Fix use of the rexi:stream2 API

Bit of a mixup during the merge on this one as it included both the
switch to stream2 as well as the switch to using couch_mrview.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/ac819809
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/ac819809
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/ac819809

Branch: refs/heads/windsor-merge
Commit: ac8198092a1d23402c81e74607d8a0f613131c0f
Parents: 094ce20
Author: Paul J. Davis <pa...@gmail.com>
Authored: Thu Aug 14 13:12:15 2014 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Thu Aug 14 13:12:15 2014 -0500

----------------------------------------------------------------------
 src/fabric_rpc.erl         | 64 ++++++++++-------------------------------
 src/fabric_view_reduce.erl |  3 +-
 2 files changed, 17 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/ac819809/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index fd5b3a7..17dc19b 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -253,14 +253,8 @@ get_or_create_db(DbName, Options) ->
 
 view_cb({meta, Meta}, Acc) ->
     % Map function starting
-    case rexi:stream2({meta, Meta}) of
-        ok ->
-            {ok, Acc};
-        stop ->
-            exit(normal);
-        timeout ->
-            exit(timeout)
-    end;
+    ok = rexi:stream2({meta, Meta}),
+    {ok, Acc};
 view_cb({row, Row}, Acc) ->
     % Adding another row
     ViewRow = #view_row{
@@ -269,59 +263,31 @@ view_cb({row, Row}, Acc) ->
         value = couch_util:get_value(value, Row),
         doc = couch_util:get_value(doc, Row)
     },
-    case rexi:stream2(ViewRow) of
-        ok ->
-            {ok, Acc};
-        timeout ->
-            exit(timeout)
-    end;
+    ok = rexi:stream2(ViewRow),
+    {ok, Acc};
 view_cb(complete, Acc) ->
     % Finish view output
-    rexi:reply(complete),
+    ok = rexi:stream_last(complete),
     {ok, Acc}.
 
 
 reduce_cb({meta, Meta}, Acc) ->
     % Map function starting
-    case rexi:sync_reply({meta, Meta}) of
-        ok ->
-            {ok, Acc};
-        stop ->
-            exit(normal);
-        timeout ->
-            exit(timeout)
-    end;
+    ok = rexi:stream2({meta, Meta}),
+    {ok, Acc};
 reduce_cb({row, Row}, Acc) ->
     % Adding another row
-    Key = couch_util:get_value(key, Row),
-    Value = couch_util:get_value(value, Row),
-    send(Key, Value, Acc);
+    ok = rexi:stream2(#view_row{
+        key = couch_util:get_value(key, Row),
+        value = couch_util:get_value(value, Row)
+    }),
+    {ok, Acc};
 reduce_cb(complete, Acc) ->
     % Finish view output
-    rexi:reply(complete),
+    ok = rexi:stream_last(complete),
     {ok, Acc}.
 
 
-send(Key, Value, Acc) ->
-    case put(fabric_sent_first_row, true) of
-    undefined ->
-        case rexi:stream2(#view_row{key=Key, value=Value}) of
-        ok ->
-            {ok, Acc};
-        stop ->
-            exit(normal);
-        timeout ->
-            exit(timeout)
-        end;
-    true ->
-        case rexi:stream2(#view_row{key=Key, value=Value}) of
-        ok ->
-            {ok, Acc};
-        timeout ->
-            exit(timeout)
-        end
-    end.
-
 changes_enumerator(#doc_info{id= <<"_local/", _/binary>>, high_seq=Seq}, Acc) ->
     {ok, Acc#cacc{seq = Seq, pending = Acc#cacc.pending-1}};
 changes_enumerator(DocInfo, Acc) ->
@@ -347,8 +313,8 @@ changes_enumerator(DocInfo, Acc) ->
             {deleted, Del} |
             if IncludeDocs -> [doc_member(Db, DocInfo, Opts)]; true -> [] end
         ]},
-        Go = rexi:stream2(ChangesRow),
-        {Go, Acc#cacc{seq = Seq, pending = Pending-1}}
+        ok = rexi:stream2(ChangesRow),
+        {ok, Acc#cacc{seq = Seq, pending = Pending-1}}
     end.
 
 doc_member(Shard, DocInfo, Opts) ->

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/ac819809/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index 583c8ff..8fb5b0b 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -107,7 +107,8 @@ handle_message({rexi_EXIT, Reason}, Worker, State) ->
 %% message as a clean way to indicate to couch_mrview_http:view_cb that the
 %% reduce response is starting.
 handle_message({meta, Meta}, {_Worker, From}, State) ->
-    gen_server:reply(From, ok),
+    rexi:stream_ack(From),
+
     #collector{
         callback = Callback,
         user_acc = AccIn