You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2014/02/12 22:17:09 UTC
fabric commit: updated refs/heads/1993-bigcouch-couch-mrview to
1e41d80
Updated Branches:
refs/heads/1993-bigcouch-couch-mrview d6955af95 -> 1e41d8012
WIP: add reduce support in fabric with couch_mrview
Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/1e41d801
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/1e41d801
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/1e41d801
Branch: refs/heads/1993-bigcouch-couch-mrview
Commit: 1e41d8012932b575a450a17a03e9ec1a7ff83ef9
Parents: d6955af
Author: Russell Branca <ch...@gmail.com>
Authored: Wed Feb 12 13:11:42 2014 -0800
Committer: Russell Branca <ch...@gmail.com>
Committed: Wed Feb 12 13:11:42 2014 -0800
----------------------------------------------------------------------
src/fabric.erl | 41 ++++++++++++++++------
src/fabric_rpc.erl | 75 +++++++++++++++++------------------------
src/fabric_view.erl | 7 ++--
src/fabric_view_reduce.erl | 29 +++++++---------
4 files changed, 77 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1e41d801/src/fabric.erl
----------------------------------------------------------------------
diff --git a/src/fabric.erl b/src/fabric.erl
index 391c5f8..24bd9b8 100644
--- a/src/fabric.erl
+++ b/src/fabric.erl
@@ -278,15 +278,34 @@ query_view(DbName, DesignName, ViewName, QueryArgs) ->
-spec query_view(dbname(), #doc{} | binary(), iodata(), callback(), any(),
#mrargs{}) ->
any().
-query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs) ->
+query_view(DbName, GroupId, ViewName, Callback, Acc0, QueryArgs)
+ when is_binary(GroupId) ->
+ {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
+ query_view(DbName, DDoc, ViewName, Callback, Acc0, QueryArgs);
+query_view(DbName, Design, ViewName, Callback, Acc0, QueryArgs0) ->
Db = dbname(DbName), View = name(ViewName),
- case is_reduce_view(Db, Design, View, QueryArgs) of
- true ->
- Mod = fabric_view_reduce;
- false ->
- Mod = fabric_view_map
- end,
- Mod:go(Db, Design, View, QueryArgs, Callback, Acc0).
+ {VInfo, QueryArgs2} = couch_util:with_db(
+ Db,
+ fun(WDb) ->
+ {ok, VInfo0, _Sig, QueryArgs1} =
+ couch_mrview_util:get_view(WDb, Design, ViewName, QueryArgs0),
+ {VInfo0, QueryArgs1}
+ end
+ ),
+ case is_reduce_view(VInfo) of
+ true ->
+ fabric_view_reduce:go(
+ Db,
+ Design,
+ View,
+ QueryArgs2,
+ Callback,
+ Acc0,
+ VInfo
+ );
+ false ->
+ fabric_view_map:go(Db, Design, View, QueryArgs2, Callback, Acc0)
+ end.
%% @doc retrieve info about a view group, disk size, language, whether compaction
%% is running and so forth
@@ -313,7 +332,7 @@ design_docs(DbName) ->
end_key = <<"_design0">>,
include_docs=true
},
- Callback = fun({total_and_offset, _, _}, []) ->
+ Callback = fun({meta, _}, []) ->
{ok, []};
({row, Props}, Acc) ->
case couch_util:get_value(id, Props) of
@@ -437,8 +456,8 @@ default_callback(complete, Acc) ->
default_callback(Row, Acc) ->
{ok, [Row | Acc]}.
-is_reduce_view(_, _, _, #mrargs{view_type=Reduce}) ->
- Reduce =:= reduce.
+is_reduce_view({Reduce, _, _}) ->
+ Reduce =:= red.
%% @doc convenience method for use in the shell, converts a keylist
%% to a `changes_args' record
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1e41d801/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index 727845f..074beb2 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -97,46 +97,10 @@ map_view(DbName, DDoc, ViewName, Args) ->
VAcc0 = #vacc{db=Db},
couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0).
-reduce_view(DbName, #doc{} = DDoc, ViewName, QueryArgs) ->
- Group = couch_view_group:design_doc_to_view_group(DDoc),
- reduce_view(DbName, Group, ViewName, QueryArgs);
-reduce_view(DbName, Group0, ViewName, QueryArgs) ->
- erlang:put(io_priority, {interactive, DbName}),
+reduce_view(DbName, DDoc, ViewName, Args) ->
{ok, Db} = get_or_create_db(DbName, []),
- #mrargs{
- group_level = GroupLevel,
- limit = Limit,
- skip = Skip,
- keys = Keys,
- stale = Stale,
- extra = Extra
- } = QueryArgs,
- set_io_priority(DbName, Extra),
- GroupFun = group_rows_fun(GroupLevel),
- {LastSeq, MinSeq} = calculate_seqs(Db, Stale),
- {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
- {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
- maybe_update_view_group(Pid, LastSeq, Stale),
- Lang = couch_view_group:get_language(Group),
- Views = couch_view_group:get_views(Group),
- erlang:monitor(process, couch_view_group:get_fd(Group)),
- {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce),
- ReduceView = {reduce, NthRed, Lang, View},
- Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip},
- case Keys of
- undefined ->
- Options0 = couch_httpd_view:make_key_options(QueryArgs),
- Options = [{key_group_fun, GroupFun} | Options0],
- couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options);
- _ ->
- lists:map(fun(Key) ->
- KeyArgs = QueryArgs#mrargs{start_key=Key, end_key=Key},
- Options0 = couch_httpd_view:make_key_options(KeyArgs),
- Options = [{key_group_fun, GroupFun} | Options0],
- couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options)
- end, Keys)
- end,
- rexi:reply(complete).
+ VAcc0 = #vacc{db=Db},
+ couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0).
calculate_seqs(Db, Stale) ->
LastSeq = couch_db:get_update_seq(Db),
@@ -268,7 +232,7 @@ get_or_create_db(DbName, Options) ->
end.
-view_cb({meta, Meta}, #vacc{}=Acc) ->
+view_cb({meta, Meta}, Acc) ->
% Map function starting
Total = couch_util:get_value(total, Meta),
Offset = couch_util:get_value(offset, Meta),
@@ -281,7 +245,7 @@ view_cb({meta, Meta}, #vacc{}=Acc) ->
timeout ->
exit(timeout)
end;
-view_cb({row, Row}, #vacc{}=Acc) ->
+view_cb({row, Row}, Acc) ->
% Adding another row
ViewRow = #view_row{
id = couch_util:get_value(id, Row),
@@ -301,6 +265,28 @@ view_cb(complete, Acc) ->
{ok, Acc}.
+reduce_cb({meta, Meta}, Acc) ->
+ % Map function starting
+ Total = couch_util:get_value(total, Meta),
+ Offset = couch_util:get_value(offset, Meta),
+ case rexi:sync_reply({total_and_offset, Total, Offset}) of
+ ok ->
+ {ok, Acc};
+ stop ->
+ exit(normal);
+ timeout ->
+ exit(timeout)
+ end;
+reduce_cb({row, Row}, Acc) ->
+ % Adding another row
+ Key = couch_util:get_value(key, Row),
+ Value = couch_util:get_value(value, Row),
+ send(Key, Value, Acc);
+reduce_cb(complete, Acc) ->
+ % Finish view output
+ rexi:reply(complete),
+ {ok, Acc}.
+
view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) ->
% matches for _all_docs and translates #full_doc_info{} -> KV pair
case couch_doc:to_doc_info(FullDocInfo) of
@@ -397,13 +383,12 @@ reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) ->
reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0 ->
send(K, Red, Acc).
-
-send(Key, Value, #view_acc{limit=Limit} = Acc) ->
+send(Key, Value, Acc) ->
case put(fabric_sent_first_row, true) of
undefined ->
case rexi:sync_reply(#view_row{key=Key, value=Value}) of
ok ->
- {ok, Acc#view_acc{limit=Limit-1}};
+ {ok, Acc};
stop ->
exit(normal);
timeout ->
@@ -412,7 +397,7 @@ send(Key, Value, #view_acc{limit=Limit} = Acc) ->
true ->
case rexi:stream(#view_row{key=Key, value=Value}) of
ok ->
- {ok, Acc#view_acc{limit=Limit-1}};
+ {ok, Acc};
timeout ->
exit(timeout)
end
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1e41d801/src/fabric_view.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view.erl b/src/fabric_view.erl
index 024b230..9fa6d5e 100644
--- a/src/fabric_view.erl
+++ b/src/fabric_view.erl
@@ -224,7 +224,10 @@ get_next_row(State) ->
Counters1 = fabric_dict:update_counter(Worker, -1, Counters0),
{Row, State#collector{rows = Rest, counters=Counters1}}.
+%% HACK: rectify nil <-> undefined discrepancies
find_next_key(nil, Dir, RowDict) ->
+ find_next_key(undefined, Dir, RowDict);
+find_next_key(undefined, Dir, RowDict) ->
case lists:sort(sort_fun(Dir), dict:fetch_keys(RowDict)) of
[] ->
throw(complete);
@@ -262,9 +265,9 @@ transform_row(#view_row{key=Key, id=Id, value=Value, doc=Doc}) ->
sort_fun(fwd) ->
- fun(A,A) -> true; (A,B) -> couch_ejson_compare:less(A,B) end;
+ fun(A,A) -> true; (A,B) -> couch_ejson_compare:less_json(A,B) end;
sort_fun(rev) ->
- fun(A,A) -> true; (A,B) -> couch_ejson_compare:less(B,A) end.
+ fun(A,A) -> true; (A,B) -> couch_ejson_compare:less_json(B,A) end.
extract_view(Pid, ViewName, [], _ViewType) ->
twig:log(error, "missing_named_view ~p", [ViewName]),
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/1e41d801/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index c922a7f..c21c8ab 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -12,29 +12,19 @@
-module(fabric_view_reduce).
--export([go/6]).
+-export([go/7]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
-go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
- {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
- go(DbName, DDoc, View, Args, Callback, Acc0);
-
-go(DbName, DDoc, VName, Args, Callback, Acc0) ->
- Group = couch_view_group:design_doc_to_view_group(DDoc),
- Lang = couch_view_group:get_language(Group),
- Views = couch_view_group:get_views(Group),
- {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce),
- {VName, RedSrc} = lists:nth(NthRed, View#mrview.reduce_funs),
- Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
- Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName,Args]}),
- Shard#shard{ref = Ref}
- end, fabric_view:get_shards(DbName, Args)),
+go(DbName, DDoc, VName, Args, Callback, Acc0, {red, {_, Lang, _}, _}=VInfo) ->
+ Shards = fabric_view:get_shards(DbName, Args),
+ Workers = fabric_util:submit_jobs(Shards, reduce_view, [DDoc, VName, Args]),
+ RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
RexiMon = fabric_util:create_monitors(Workers),
- #mrargs{limit = Limit, skip = Skip} = Args,
+ #mrargs{limit = Limit, skip = Skip, keys = Keys} = Args,
OsProc = case os_proc_needed(RedSrc) of
true -> couch_query_servers:get_os_process(Lang);
_ -> nil
@@ -44,7 +34,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc0) ->
query_args = Args,
callback = Callback,
counters = fabric_dict:init(Workers, 0),
- keys = Args#mrargs.keys,
+ keys = Keys,
skip = Skip,
limit = Limit,
lang = Lang,
@@ -84,6 +74,11 @@ handle_message({rexi_EXIT, Reason}, Worker, State) ->
{error, Resp}
end;
+%% TODO: what to do with this message?
+handle_message({total_and_offset, _, _}, {_Worker, From}, State) ->
+ gen_server:reply(From, ok),
+ {ok, State};
+
handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
#collector{counters = Counters0, rows = Rows0} = State,
case fabric_dict:lookup_element(Worker, Counters0) of