You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2014/02/12 22:17:09 UTC

fabric commit: updated refs/heads/1993-bigcouch-couch-mrview to 1e41d80

Updated Branches:
  refs/heads/1993-bigcouch-couch-mrview d6955af95 -> 1e41d8012


WIP: add reduce support in fabric with couch_mrview


Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/1e41d801
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/1e41d801
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/1e41d801

Branch: refs/heads/1993-bigcouch-couch-mrview
Commit: 1e41d8012932b575a450a17a03e9ec1a7ff83ef9
Parents: d6955af
Author: Russell Branca <ch...@gmail.com>
Authored: Wed Feb 12 13:11:42 2014 -0800
Committer: Russell Branca <ch...@gmail.com>
Committed: Wed Feb 12 13:11:42 2014 -0800

----------------------------------------------------------------------
 src/fabric.erl             | 41 ++++++++++++++++------
 src/fabric_rpc.erl         | 75 +++++++++++++++++------------------------
 src/fabric_view.erl        |  7 ++--
 src/fabric_view_reduce.erl | 29 +++++++---------
 4 files changed, 77 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1e41d801/src/fabric.erl
----------------------------------------------------------------------
diff --git a/src/fabric.erl b/src/fabric.erl
index 391c5f8..24bd9b8 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -278,15 +278,34 @@ query_view(DbName, DesignName, ViewName, QueryArgs) ->
 -spec query_view(dbname(), #doc{} | binary(), iodata(), callback(), any(),
         #mrargs{}) ->
     any().
-query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) ->
+query_view(DbName, GroupId, ViewName, Callback, Acc0, QueryArgs)
+        when is_binary(GroupId) ->
+    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
+    query_view(DbName, DDoc, ViewName, Callback, Acc0, QueryArgs);
+query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs0) ->
     Db = dbname(DbName), View = name(ViewName),
-    case is_reduce_view(Db, Design, View, QueryArgs) of
-    true ->
-        Mod = fabric_view_reduce;
-    false ->
-        Mod = fabric_view_map
-    end,
-    Mod:go(Db, Design, View, QueryArgs, Callback, Acc0).
+    {VInfo, QueryArgs2} = couch_util:with_db(
+        Db,
+        fun(WDb) ->
+            {ok, VInfo0, _Sig, QueryArgs1} =
+                couch_mrview_util:get_view(WDb, Design, ViewName, QueryArgs0),
+            {VInfo0, QueryArgs1}
+        end
+    ),
+    case is_reduce_view(VInfo) of
+        true ->
+            fabric_view_reduce:go(
+                Db,
+                Design,
+                View,
+                QueryArgs2,
+                Callback,
+                Acc0,
+                VInfo
+            );
+        false ->
+            fabric_view_map:go(Db, Design, View, QueryArgs2, Callback, Acc0)
+    end.
 
 %% @doc retrieve info about a view group, disk size, language, whether compaction
 %%      is running and so forth
@@ -313,7 +332,7 @@ design_docs(DbName) ->
         end_key = <<"_design0">>,
         include_docs=true
     },
-    Callback = fun({total_and_offset, _, _}, []) ->
+    Callback = fun({meta, _}, []) ->
         {ok, []};
     ({row, Props}, Acc) ->
         case couch_util:get_value(id, Props) of
@@ -437,8 +456,8 @@ default_callback(complete, Acc) ->
 default_callback(Row, Acc) ->
     {ok, [Row | Acc]}.
 
-is_reduce_view(_, _, _, #mrargs{view_type=Reduce}) ->
-    Reduce =:= reduce.
+is_reduce_view({Reduce, _, _}) ->
+    Reduce =:= red.
 
 %% @doc convenience method for use in the shell, converts a keylist
 %%      to a `changes_args' record

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1e41d801/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 727845f..074beb2 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -97,46 +97,10 @@ map_view(DbName, DDoc, ViewName, Args) ->
     VAcc0 = #vacc{db=Db},
     couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0).
 
-reduce_view(DbName, #doc{} = DDoc, ViewName, QueryArgs) ->
-    Group = couch_view_group:design_doc_to_view_group(DDoc),
-    reduce_view(DbName, Group, ViewName, QueryArgs);
-reduce_view(DbName, Group0, ViewName, QueryArgs) ->
-    erlang:put(io_priority, {interactive, DbName}),
+reduce_view(DbName, DDoc, ViewName, Args) ->
     {ok, Db} = get_or_create_db(DbName, []),
-    #mrargs{
-        group_level = GroupLevel,
-        limit = Limit,
-        skip = Skip,
-        keys = Keys,
-        stale = Stale,
-        extra = Extra
-    } = QueryArgs,
-    set_io_priority(DbName, Extra),
-    GroupFun = group_rows_fun(GroupLevel),
-    {LastSeq, MinSeq} = calculate_seqs(Db, Stale),
-    {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
-    {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
-    maybe_update_view_group(Pid, LastSeq, Stale),
-    Lang = couch_view_group:get_language(Group),
-    Views = couch_view_group:get_views(Group),
-    erlang:monitor(process, couch_view_group:get_fd(Group)),
-    {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce),
-    ReduceView = {reduce, NthRed, Lang, View},
-    Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip},
-    case Keys of
-    undefined ->
-        Options0 = couch_httpd_view:make_key_options(QueryArgs),
-        Options = [{key_group_fun, GroupFun} | Options0],
-        couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options);
-    _ ->
-        lists:map(fun(Key) ->
-            KeyArgs = QueryArgs#mrargs{start_key=Key, end_key=Key},
-            Options0 = couch_httpd_view:make_key_options(KeyArgs),
-            Options = [{key_group_fun, GroupFun} | Options0],
-            couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options)
-        end, Keys)
-    end,
-    rexi:reply(complete).
+    VAcc0 = #vacc{db=Db},
+    couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0).
 
 calculate_seqs(Db, Stale) ->
     LastSeq = couch_db:get_update_seq(Db),
@@ -268,7 +232,7 @@ get_or_create_db(DbName, Options) ->
     end.
 
 
-view_cb({meta, Meta}, #vacc{}=Acc) ->
+view_cb({meta, Meta}, Acc) ->
     % Map function starting
     Total = couch_util:get_value(total, Meta),
     Offset = couch_util:get_value(offset, Meta),
@@ -281,7 +245,7 @@ view_cb({meta, Meta}, #vacc{}=Acc) ->
         timeout ->
             exit(timeout)
     end;
-view_cb({row, Row}, #vacc{}=Acc) ->
+view_cb({row, Row}, Acc) ->
     % Adding another row
     ViewRow = #view_row{
         id = couch_util:get_value(id, Row),
@@ -301,6 +265,28 @@ view_cb(complete, Acc) ->
     {ok, Acc}.
 
 
+reduce_cb({meta, Meta}, Acc) ->
+    % Map function starting
+    Total = couch_util:get_value(total, Meta),
+    Offset = couch_util:get_value(offset, Meta),
+    case rexi:sync_reply({total_and_offset, Total, Offset}) of
+        ok ->
+            {ok, Acc};
+        stop ->
+            exit(normal);
+        timeout ->
+            exit(timeout)
+    end;
+reduce_cb({row, Row}, Acc) ->
+    % Adding another row
+    Key = couch_util:get_value(key, Row),
+    Value = couch_util:get_value(value, Row),
+    send(Key, Value, Acc);
+reduce_cb(complete, Acc) ->
+    % Finish view output
+    rexi:reply(complete),
+    {ok, Acc}.
+
 view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) ->
     % matches for _all_docs and translates #full_doc_info{} -> KV pair
     case couch_doc:to_doc_info(FullDocInfo) of
@@ -397,13 +383,12 @@ reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) ->
 reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0 ->
     send(K, Red, Acc).
 
-
-send(Key, Value, #view_acc{limit=Limit} = Acc) ->
+send(Key, Value, Acc) ->
     case put(fabric_sent_first_row, true) of
     undefined ->
         case rexi:sync_reply(#view_row{key=Key, value=Value}) of
         ok ->
-            {ok, Acc#view_acc{limit=Limit-1}};
+            {ok, Acc};
         stop ->
             exit(normal);
         timeout ->
@@ -412,7 +397,7 @@ send(Key, Value, #view_acc{limit=Limit} = Acc) ->
     true ->
         case rexi:stream(#view_row{key=Key, value=Value}) of
         ok ->
-            {ok, Acc#view_acc{limit=Limit-1}};
+            {ok, Acc};
         timeout ->
             exit(timeout)
         end

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1e41d801/src/fabric_view.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
index 024b230..9fa6d5e 100644
--- a/src/fabric_view.erl
+++ b/src/fabric_view.erl
@@ -224,7 +224,10 @@ get_next_row(State) ->
     Counters1 = fabric_dict:update_counter(Worker, -1, Counters0),
     {Row, State#collector{rows = Rest, counters=Counters1}}.
 
+%% HACK: rectify nil <-> undefined discrepancies
 find_next_key(nil, Dir, RowDict) ->
+    find_next_key(undefined, Dir, RowDict);
+find_next_key(undefined, Dir, RowDict) ->
     case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of
     [] ->
         throw(complete);
@@ -262,9 +265,9 @@ transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) ->
 
 
 sort_fun(fwd) ->
-    fun(A,A) -> true; (A,B) -> couch_ejson_compare:less(A,B) end;
+    fun(A,A) -> true; (A,B) -> couch_ejson_compare:less_json(A,B) end;
 sort_fun(rev) ->
-    fun(A,A) -> true; (A,B) -> couch_ejson_compare:less(B,A) end.
+    fun(A,A) -> true; (A,B) -> couch_ejson_compare:less_json(B,A) end.
 
 extract_view(Pid, ViewName, [], _ViewType) ->
     twig:log(error, "missing_named_view ~p", [ViewName]),

http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1e41d801/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index c922a7f..c21c8ab 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -12,29 +12,19 @@
 
 -module(fabric_view_reduce).
 
--export([go/6]).
+-export([go/7]).
 
 -include_lib("fabric/include/fabric.hrl").
 -include_lib("mem3/include/mem3.hrl").
 -include_lib("couch/include/couch_db.hrl").
 -include_lib("couch_mrview/include/couch_mrview.hrl").
 
-go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
-    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
-    go(DbName, DDoc, View, Args, Callback, Acc0);
-
-go(DbName, DDoc, VName, Args, Callback, Acc0) ->
-    Group = couch_view_group:design_doc_to_view_group(DDoc),
-    Lang = couch_view_group:get_language(Group),
-    Views = couch_view_group:get_views(Group),
-    {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce),
-    {VName, RedSrc} = lists:nth(NthRed, View#mrview.reduce_funs),
-    Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
-        Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName,Args]}),
-        Shard#shard{ref = Ref}
-    end, fabric_view:get_shards(DbName, Args)),
+go(DbName, DDoc, VName, Args, Callback, Acc0, {red, {_, Lang, _}, _}=VInfo) ->
+    Shards = fabric_view:get_shards(DbName, Args),
+    Workers = fabric_util:submit_jobs(Shards, reduce_view, [DDoc, VName, Args]),
+    RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
     RexiMon = fabric_util:create_monitors(Workers),
-    #mrargs{limit = Limit, skip = Skip} = Args,
+    #mrargs{limit = Limit, skip = Skip, keys = Keys} = Args,
     OsProc = case os_proc_needed(RedSrc) of
         true -> couch_query_servers:get_os_process(Lang);
         _ -> nil
@@ -44,7 +34,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc0) ->
         query_args = Args,
         callback = Callback,
         counters = fabric_dict:init(Workers, 0),
-        keys = Args#mrargs.keys,
+        keys = Keys,
         skip = Skip,
         limit = Limit,
         lang = Lang,
@@ -84,6 +74,11 @@ handle_message({rexi_EXIT, Reason}, Worker, State) ->
         {error, Resp}
     end;
 
+%% TODO: what to do with this message?
+handle_message({total_and_offset, _, _}, {_Worker, From}, State) ->
+    gen_server:reply(From, ok),
+    {ok, State};
+
 handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
     #collector{counters = Counters0, rows = Rows0} = State,
     case fabric_dict:lookup_element(Worker, Counters0) of