You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ch...@apache.org on 2014/03/28 19:45:05 UTC
[4/5] fabric commit: updated refs/heads/1993-bigcouch-couch-mrview to
aeac2df
Update fabric_rpc to use couch_mrview
This modifies fabric_rpc to use couch_mrview for map views, reduce
views, and all docs queries. This removes the majority of view logic
from within fabric as it's better handled now in couch_mrview, and
actually provides pretty drastic decrease in logic in these
functions.
The {view,reduce}_cb functions are also updated to use the new line
format of couch_mrview, switching total_and_offset to meta and
updating the row callbacks as well.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/commit/efddaf1d
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/tree/efddaf1d
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-fabric/diff/efddaf1d
Branch: refs/heads/1993-bigcouch-couch-mrview
Commit: efddaf1d14b19187f31a2c5c19a40061b3635e9a
Parents: 28528ac
Author: Russell Branca <ch...@gmail.com>
Authored: Wed Mar 26 17:25:40 2014 -0700
Committer: Russell Branca <ch...@gmail.com>
Committed: Fri Mar 28 11:33:19 2014 -0700
----------------------------------------------------------------------
src/fabric_rpc.erl | 282 +++++++++-------------------------------
src/fabric_view_reduce.erl | 24 +---
2 files changed, 67 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/efddaf1d/src/fabric_rpc.erl
----------------------------------------------------------------------
diff --git a/src/fabric_rpc.erl b/src/fabric_rpc.erl
index cee3c55..d2e6486 100644
--- a/src/fabric_rpc.erl
+++ b/src/fabric_rpc.erl
@@ -27,38 +27,6 @@
%% rpc endpoints
%% call to with_db will supply your M:F with a #db{} and then remaining args
-all_docs(DbName, #mrargs{keys=undefined} = QueryArgs) ->
- {ok, Db} = get_or_create_db(DbName, []),
- #mrargs{
- start_key = StartKey,
- start_key_docid = StartDocId,
- end_key = EndKey,
- end_key_docid = EndDocId,
- limit = Limit,
- skip = Skip,
- include_docs = IncludeDocs,
- direction = Dir,
- inclusive_end = Inclusive,
- extra = Extra
- } = QueryArgs,
- set_io_priority(DbName, Extra),
- {ok, Total} = couch_db:get_doc_count(Db),
- Acc0 = #view_acc{
- db = Db,
- include_docs = IncludeDocs,
- conflicts = proplists:get_value(conflicts, Extra, false),
- limit = Limit+Skip,
- total_rows = Total
- },
- EndKeyType = if Inclusive -> end_key; true -> end_key_gt end,
- Options = [
- {dir, Dir},
- {start_key, if is_binary(StartKey) -> StartKey; true -> StartDocId end},
- {EndKeyType, if is_binary(EndKey) -> EndKey; true -> EndDocId end}
- ],
- {ok, _, Acc} = couch_db:enum_docs(Db, fun view_fold/3, Acc0, Options),
- final_response(Total, Acc#view_acc.offset).
-
changes(DbName, #changes_args{} = Args, StartSeq) ->
changes(DbName, [Args], StartSeq);
changes(DbName, Options, StartSeq) ->
@@ -80,104 +48,20 @@ changes(DbName, Options, StartSeq) ->
rexi:reply(Error)
end.
-map_view(DbName, DDoc, ViewName, QueryArgs) ->
+all_docs(DbName, #mrargs{keys=undefined} = Args) ->
{ok, Db} = get_or_create_db(DbName, []),
- #mrargs{
- limit = Limit,
- skip = Skip,
- keys = Keys,
- include_docs = IncludeDocs,
- stale = Stale,
- view_type = ViewType,
- extra = Extra
- } = QueryArgs,
- set_io_priority(DbName, Extra),
- {LastSeq, MinSeq} = calculate_seqs(Db, Stale),
- Group0 = couch_view_group:design_doc_to_view_group(DDoc),
- {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
- {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
- maybe_update_view_group(Pid, LastSeq, Stale),
- erlang:monitor(process, couch_view_group:get_fd(Group)),
- Views = couch_view_group:get_views(Group),
- View = fabric_view:extract_view(Pid, ViewName, Views, ViewType),
- {ok, Total} = couch_view:get_row_count(View),
- Acc0 = #view_acc{
- db = Db,
- include_docs = IncludeDocs,
- conflicts = proplists:get_value(conflicts, Extra, false),
- limit = Limit+Skip,
- total_rows = Total,
- reduce_fun = fun couch_view:reduce_to_count/1
- },
- case Keys of
- undefined ->
- Options = couch_httpd_view:make_key_options(QueryArgs),
- {ok, _, Acc} = couch_view:fold(View, fun view_fold/3, Acc0, Options);
- _ ->
- Acc = lists:foldl(fun(Key, AccIn) ->
- KeyArgs = QueryArgs#mrargs{start_key=Key, end_key=Key},
- Options = couch_httpd_view:make_key_options(KeyArgs),
- {_Go, _, Out} = couch_view:fold(View, fun view_fold/3, AccIn,
- Options),
- Out
- end, Acc0, Keys)
- end,
- final_response(Total, Acc#view_acc.offset).
+ VAcc0 = #vacc{db=Db},
+ couch_mrview:query_all_docs(Db, Args, fun view_cb/2, VAcc0).
-reduce_view(DbName, #doc{} = DDoc, ViewName, QueryArgs) ->
- Group = couch_view_group:design_doc_to_view_group(DDoc),
- reduce_view(DbName, Group, ViewName, QueryArgs);
-reduce_view(DbName, Group0, ViewName, QueryArgs) ->
- erlang:put(io_priority, {interactive, DbName}),
+map_view(DbName, DDoc, ViewName, Args) ->
{ok, Db} = get_or_create_db(DbName, []),
- #mrargs{
- group_level = GroupLevel,
- limit = Limit,
- skip = Skip,
- keys = Keys,
- stale = Stale,
- extra = Extra
- } = QueryArgs,
- set_io_priority(DbName, Extra),
- GroupFun = group_rows_fun(GroupLevel),
- {LastSeq, MinSeq} = calculate_seqs(Db, Stale),
- {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
- {ok, Group} = couch_view_group:request_group(Pid, MinSeq),
- maybe_update_view_group(Pid, LastSeq, Stale),
- Lang = couch_view_group:get_language(Group),
- Views = couch_view_group:get_views(Group),
- erlang:monitor(process, couch_view_group:get_fd(Group)),
- {NthRed, View} = fabric_view:extract_view(Pid, ViewName, Views, reduce),
- ReduceView = {reduce, NthRed, Lang, View},
- Acc0 = #view_acc{group_level = GroupLevel, limit = Limit+Skip},
- case Keys of
- undefined ->
- Options0 = couch_httpd_view:make_key_options(QueryArgs),
- Options = [{key_group_fun, GroupFun} | Options0],
- couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options);
- _ ->
- lists:map(fun(Key) ->
- KeyArgs = QueryArgs#mrargs{start_key=Key, end_key=Key},
- Options0 = couch_httpd_view:make_key_options(KeyArgs),
- Options = [{key_group_fun, GroupFun} | Options0],
- couch_view:fold_reduce(ReduceView, fun reduce_fold/3, Acc0, Options)
- end, Keys)
- end,
- rexi:reply(complete).
+ VAcc0 = #vacc{db=Db},
+ couch_mrview:query_view(Db, DDoc, ViewName, Args, fun view_cb/2, VAcc0).
-calculate_seqs(Db, Stale) ->
- LastSeq = couch_db:get_update_seq(Db),
- if
- Stale == ok orelse Stale == update_after ->
- {LastSeq, 0};
- true ->
- {LastSeq, LastSeq}
- end.
-
-maybe_update_view_group(GroupPid, LastSeq, update_after) ->
- couch_view_group:trigger_group_update(GroupPid, LastSeq);
-maybe_update_view_group(_, _, _) ->
- ok.
+reduce_view(DbName, DDoc, ViewName, Args) ->
+ {ok, Db} = get_or_create_db(DbName, []),
+ VAcc0 = #vacc{db=Db},
+ couch_mrview:query_view(Db, DDoc, ViewName, Args, fun reduce_cb/2, VAcc0).
create_db(DbName) ->
rexi:reply(case couch_server:create(DbName, []) of
@@ -252,9 +136,8 @@ update_docs(DbName, Docs0, Options) ->
Docs = make_att_readers(Docs0),
with_db(DbName, Options, {couch_db, update_docs, [Docs, Options, X]}).
-group_info(DbName, Group0) ->
- {ok, Pid} = gen_server:call(couch_view, {get_group_server, DbName, Group0}),
- rexi:reply(couch_view_group:request_group_info(Pid)).
+group_info(DbName, DDocId) ->
+ with_db(DbName, [], {couch_mrview, get_info, [DDocId]}).
reset_validation_funs(DbName) ->
case get_or_create_db(DbName, []) of
@@ -294,109 +177,64 @@ get_or_create_db(DbName, Options) ->
Else
end.
-view_fold(#full_doc_info{} = FullDocInfo, OffsetReds, Acc) ->
- % matches for _all_docs and translates #full_doc_info{} -> KV pair
- case couch_doc:to_doc_info(FullDocInfo) of
- #doc_info{id=Id, revs=[#rev_info{deleted=false, rev=Rev}|_]} = DI ->
- Value = {[{rev,couch_doc:rev_to_str(Rev)}]},
- view_fold({{Id,Id}, Value}, OffsetReds, Acc#view_acc{doc_info=DI});
- #doc_info{revs=[#rev_info{deleted=true}|_]} ->
- {ok, Acc}
- end;
-view_fold(KV, OffsetReds, #view_acc{offset=nil, total_rows=Total} = Acc) ->
- % calculates the offset for this shard
- #view_acc{reduce_fun=Reduce} = Acc,
- Offset = Reduce(OffsetReds),
- case rexi:sync_reply({total_and_offset, Total, Offset}) of
- ok ->
- view_fold(KV, OffsetReds, Acc#view_acc{offset=Offset});
- stop ->
- exit(normal);
- timeout ->
- exit(timeout)
+
+view_cb({meta, Meta}, Acc) ->
+ % Map function starting
+ case rexi:sync_reply({meta, Meta}) of
+ ok ->
+ {ok, Acc};
+ stop ->
+ exit(normal);
+ timeout ->
+ exit(timeout)
end;
-view_fold(_KV, _Offset, #view_acc{limit=0} = Acc) ->
- % we scanned through limit+skip local rows
- {stop, Acc};
-view_fold({{Key,Id}, Value}, _Offset, Acc) ->
- % the normal case
- #view_acc{
- db = Db,
- doc_info = DocInfo,
- limit = Limit,
- conflicts = Conflicts,
- include_docs = IncludeDocs
- } = Acc,
- case Value of {Props} ->
- LinkedDocs = (couch_util:get_value(<<"_id">>, Props) =/= undefined);
- _ ->
- LinkedDocs = false
- end,
- if LinkedDocs ->
- % we'll embed this at a higher level b/c the doc may be non-local
- Doc = undefined;
- IncludeDocs ->
- IdOrInfo = if DocInfo =/= nil -> DocInfo; true -> Id end,
- Options = if Conflicts -> [conflicts]; true -> [] end,
- case couch_db:open_doc(Db, IdOrInfo, Options) of
- {not_found, deleted} ->
- Doc = null;
- {not_found, missing} ->
- Doc = undefined;
- {ok, Doc0} ->
- Doc = couch_doc:to_json_obj(Doc0, [])
- end;
- true ->
- Doc = undefined
- end,
- case rexi:stream(#view_row{key=Key, id=Id, value=Value, doc=Doc}) of
+view_cb({row, Row}, Acc) ->
+ % Adding another row
+ ViewRow = #view_row{
+ id = couch_util:get_value(id, Row),
+ key = couch_util:get_value(key, Row),
+ value = couch_util:get_value(value, Row),
+ doc = couch_util:get_value(doc, Row)
+ },
+ case rexi:stream(ViewRow) of
ok ->
- {ok, Acc#view_acc{limit=Limit-1}};
+ {ok, Acc};
timeout ->
exit(timeout)
- end.
-
-final_response(Total, nil) ->
- case rexi:sync_reply({total_and_offset, Total, Total}) of ok ->
- rexi:reply(complete);
- stop ->
- ok;
- timeout ->
- exit(timeout)
end;
-final_response(_Total, _Offset) ->
- rexi:reply(complete).
-
-%% TODO: handle case of bogus group level
-group_rows_fun(exact) ->
- fun({Key1,_}, {Key2,_}) -> Key1 == Key2 end;
-group_rows_fun(0) ->
- fun(_A, _B) -> true end;
-group_rows_fun(GroupLevel) when is_integer(GroupLevel) ->
- fun({[_|_] = Key1,_}, {[_|_] = Key2,_}) ->
- lists:sublist(Key1, GroupLevel) == lists:sublist(Key2, GroupLevel);
- ({Key1,_}, {Key2,_}) ->
- Key1 == Key2
- end.
+view_cb(complete, Acc) ->
+ % Finish view output
+ rexi:reply(complete),
+ {ok, Acc}.
-reduce_fold(_Key, _Red, #view_acc{limit=0} = Acc) ->
- {stop, Acc};
-reduce_fold(_Key, Red, #view_acc{group_level=0} = Acc) ->
- send(null, Red, Acc);
-reduce_fold(Key, Red, #view_acc{group_level=exact} = Acc) ->
- send(Key, Red, Acc);
-reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0, is_list(K) ->
- send(lists:sublist(K, I), Red, Acc);
-reduce_fold(K, Red, #view_acc{group_level=I} = Acc) when I > 0 ->
- send(K, Red, Acc).
-
-send(Key, Value, #view_acc{limit=Limit} = Acc) ->
+reduce_cb({meta, Meta}, Acc) ->
+ % Map function starting
+ case rexi:sync_reply({meta, Meta}) of
+ ok ->
+ {ok, Acc};
+ stop ->
+ exit(normal);
+ timeout ->
+ exit(timeout)
+ end;
+reduce_cb({row, Row}, Acc) ->
+ % Adding another row
+ Key = couch_util:get_value(key, Row),
+ Value = couch_util:get_value(value, Row),
+ send(Key, Value, Acc);
+reduce_cb(complete, Acc) ->
+ % Finish view output
+ rexi:reply(complete),
+ {ok, Acc}.
+
+
+send(Key, Value, Acc) ->
case put(fabric_sent_first_row, true) of
undefined ->
case rexi:sync_reply(#view_row{key=Key, value=Value}) of
ok ->
- {ok, Acc#view_acc{limit=Limit-1}};
+ {ok, Acc};
stop ->
exit(normal);
timeout ->
@@ -405,7 +243,7 @@ send(Key, Value, #view_acc{limit=Limit} = Acc) ->
true ->
case rexi:stream(#view_row{key=Key, value=Value}) of
ok ->
- {ok, Acc#view_acc{limit=Limit-1}};
+ {ok, Acc};
timeout ->
exit(timeout)
end
http://git-wip-us.apache.org/repos/asf/couchdb-fabric/blob/efddaf1d/src/fabric_view_reduce.erl
----------------------------------------------------------------------
diff --git a/src/fabric_view_reduce.erl b/src/fabric_view_reduce.erl
index c922a7f..d2ea464 100644
--- a/src/fabric_view_reduce.erl
+++ b/src/fabric_view_reduce.erl
@@ -12,29 +12,19 @@
-module(fabric_view_reduce).
--export([go/6]).
+-export([go/7]).
-include_lib("fabric/include/fabric.hrl").
-include_lib("mem3/include/mem3.hrl").
-include_lib("couch/include/couch_db.hrl").
-include_lib("couch_mrview/include/couch_mrview.hrl").
-go(DbName, GroupId, View, Args, Callback, Acc0) when is_binary(GroupId) ->
- {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
- go(DbName, DDoc, View, Args, Callback, Acc0);
-
-go(DbName, DDoc, VName, Args, Callback, Acc0) ->
- Group = couch_view_group:design_doc_to_view_group(DDoc),
- Lang = couch_view_group:get_language(Group),
- Views = couch_view_group:get_views(Group),
- {NthRed, View} = fabric_view:extract_view(nil, VName, Views, reduce),
- {VName, RedSrc} = lists:nth(NthRed, View#mrview.reduce_funs),
- Workers = lists:map(fun(#shard{name=Name, node=N} = Shard) ->
- Ref = rexi:cast(N, {fabric_rpc, reduce_view, [Name,DDoc,VName,Args]}),
- Shard#shard{ref = Ref}
- end, fabric_view:get_shards(DbName, Args)),
+go(DbName, DDoc, VName, Args, Callback, Acc0, {red, {_, Lang, _}, _}=VInfo) ->
+ Shards = fabric_view:get_shards(DbName, Args),
+ Workers = fabric_util:submit_jobs(Shards, reduce_view, [DDoc, VName, Args]),
+ RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
RexiMon = fabric_util:create_monitors(Workers),
- #mrargs{limit = Limit, skip = Skip} = Args,
+ #mrargs{limit = Limit, skip = Skip, keys = Keys} = Args,
OsProc = case os_proc_needed(RedSrc) of
true -> couch_query_servers:get_os_process(Lang);
_ -> nil
@@ -44,7 +34,7 @@ go(DbName, DDoc, VName, Args, Callback, Acc0) ->
query_args = Args,
callback = Callback,
counters = fabric_dict:init(Workers, 0),
- keys = Args#mrargs.keys,
+ keys = Keys,
skip = Skip,
limit = Limit,
lang = Lang,