You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/12/04 21:09:39 UTC
[08/22] couch commit: updated
refs/heads/2491-refactor-couch-httpd-auth to 3e8286d
Add view filtering optimization to changes feeds
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/20e585dd
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/20e585dd
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/20e585dd
Branch: refs/heads/2491-refactor-couch-httpd-auth
Commit: 20e585dd07e4da0fe67ace09f0a291e3fe759534
Parents: 6d6b801
Author: Benjamin Bastian <be...@gmail.com>
Authored: Fri Aug 22 23:56:56 2014 +0700
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Fri Oct 31 12:43:53 2014 -0700
----------------------------------------------------------------------
src/couch_changes.erl | 181 ++++++++++++++++++++++++---------------
src/couch_httpd_changes.erl | 180 +-------------------------------------
2 files changed, 116 insertions(+), 245 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/20e585dd/src/couch_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_changes.erl b/src/couch_changes.erl
index abe8cf9..259f83c 100644
--- a/src/couch_changes.erl
+++ b/src/couch_changes.erl
@@ -63,25 +63,45 @@ handle_changes(Args1, Req, Db0, Type) ->
dir = Dir,
since = Since
} = Args1,
- {StartListenerFun, DDocName, ViewName, View} = case Type of
- {view, DDocName0, ViewName0} ->
- SNFun = fun() ->
- couch_event:link_listener(
- ?MODULE, handle_view_event, self(), [{dbname, Db0#db.name}]
- )
- end,
- {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view(Db0#db.name, DDocName0, ViewName0, #mrargs{}),
- {SNFun, DDocName0, ViewName0, View0};
- db ->
- SNFun = fun() ->
- couch_event:link_listener(
- ?MODULE, handle_db_event, self(), [{dbname, Db0#db.name}]
- )
- end,
- {SNFun, undefined, undefined, undefined}
- end,
Filter = configure_filter(FilterName, Style, Req, Db0),
Args = Args1#changes_args{filter_fun = Filter},
+ UseViewChanges = case {Type, Filter} of
+ {{view, _, _}, _} ->
+ true;
+ {_, {fast_view, _, _, _}} ->
+ true;
+ _ ->
+ false
+ end,
+ {StartListenerFun, DDocName, ViewName, View} = if UseViewChanges ->
+ {DDocName0, ViewName0} = case {Type, Filter} of
+ {{view, DDocName1, ViewName1}, _} ->
+ {DDocName1, ViewName1};
+ {_, {fast_view, _, DDoc, ViewName1}} ->
+ {DDoc#doc.id, ViewName1}
+ end,
+ {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view(
+ Db0#db.name, DDocName0, ViewName0, #mrargs{}),
+ case View0#mrview.seq_btree of
+ #btree{} ->
+ ok;
+ _ ->
+ throw({bad_request, "view changes not enabled"})
+ end,
+ SNFun = fun() ->
+ couch_event:link_listener(
+ ?MODULE, handle_view_event, {self(), DDocName0}, [{dbname, Db0#db.name}]
+ )
+ end,
+ {SNFun, DDocName0, ViewName0, View0};
+ true ->
+ SNFun = fun() ->
+ couch_event:link_listener(
+ ?MODULE, handle_db_event, self(), [{dbname, Db0#db.name}]
+ )
+ end,
+ {SNFun, undefined, undefined, undefined}
+ end,
Start = fun() ->
{ok, Db} = couch_db:reopen(Db0),
StartSeq = case Dir of
@@ -180,7 +200,15 @@ configure_filter("_view", Style, Req, Db) ->
[DName, VName] ->
{ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
check_member_exists(DDoc, [<<"views">>, VName]),
- {view, Style, DDoc, VName};
+ try
+ true = couch_util:get_nested_json_value(
+ DDoc#doc.body,
+ [<<"options">>, <<"seq_indexed">>]
+ ),
+ {fast_view, Style, DDoc, VName}
+ catch _:_ ->
+ {view, Style, DDoc, VName}
+ end;
[] ->
Msg = "`view` must be of the form `designname/viewname`",
throw({bad_request, Msg})
@@ -237,6 +265,36 @@ filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) ->
{ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
filter_revs(Passes, Docs).
+fast_view_filter(Db, {{Seq, _}, {ID, _}}, {fast_view, Style, _, _}) ->
+ case couch_db:get_doc_info(Db, ID) of
+ {ok, #doc_info{high_seq=Seq}=DocInfo} ->
+ Docs = open_revs(Db, DocInfo, Style),
+ Changes = lists:map(fun(#doc{revs={RevPos, [RevId | _]}}) ->
+ RevStr = couch_doc:rev_to_str({RevPos, RevId}),
+ {[{<<"rev">>, RevStr}]}
+ end, Docs),
+ {DocInfo, Changes};
+ {ok, #doc_info{high_seq=HighSeq}} when Seq > HighSeq ->
+ % If the view seq tree is out of date (or if the view seq tree
+ % was opened before the db) seqs may come by from the seq tree
+ % which correspond to the not-most-current revision of a document.
+ % The proper thing to do is to not send this old revision, but wait
+ % until we reopen the up-to-date view seq tree and continue the
+ % fold.
+ % I left the Seq > HighSeq guard in so if (for some godforsaken
+ % reason) the seq in the view is more current than the database,
+ % we'll throw an error.
+ {ok, []};
+ {error, not_found} ->
+ {ok, []}
+ end.
+
+
+
+view_filter(_Db, _KV, {default, _Style}) ->
+ [ok]. % TODO: make a real thing
+
+
get_view_qs({json_req, {Props}}) ->
{Query} = couch_util:get_value(<<"query">>, Props, {[]}),
binary_to_list(couch_util:get_value(<<"view">>, Query, ""));
@@ -477,7 +535,7 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
db = Db, callback = Callback,
timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq,
prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit,
- ddoc_name = DDocName, view_name = ViewName, view = View
+ ddoc_name = DDocName, view_name = ViewName
} = ChangesAcc,
couch_db:close(Db),
@@ -517,54 +575,27 @@ maybe_refresh_view(Db, DDocName, ViewName) ->
end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
Callback({stop, EndSeq}, ResponseType, UserAcc).
-changes_enumerator(Value, #changes_acc{resp_type = ResponseType} = Acc)
- when ResponseType =:= "continuous"
- orelse ResponseType =:= "eventsource" ->
- #changes_acc{
- filter = Filter, callback = Callback,
- user_acc = UserAcc, limit = Limit, db = Db,
- timeout = Timeout, timeout_fun = TimeoutFun,
- view = View
- } = Acc,
- {Seq, Results0} = case View of
- undefined ->
- {Value#doc_info.high_seq, filter(Db, Value, Filter)};
- #mrview{} ->
- {{Seq0, _}, _} = Value,
- {Seq0, [ok]} % TODO
- end,
- Results = [Result || Result <- Results0, Result /= null],
- %% TODO: I'm thinking this should be < 1 and not =< 1
- Go = if Limit =< 1 -> stop; true -> ok end,
- case Results of
- [] ->
- {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
- case Done of
- stop ->
- {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
- ok ->
- {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
- end;
- _ ->
- ChangesRow = changes_row(Results, Value, Acc),
- UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
- reset_heartbeat(),
- {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}}
- end;
-changes_enumerator(Value, Acc) ->
+changes_enumerator(Value0, Acc) ->
#changes_acc{
filter = Filter, callback = Callback, prepend = Prepend,
user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
timeout = Timeout, timeout_fun = TimeoutFun, view = View
} = Acc,
- {Seq, Results0} = case View of
- undefined ->
- {Value#doc_info.high_seq, filter(Db, Value, Filter)};
- #mrview{} ->
- {{Seq0,_}, _} = Value,
- {Seq0, [ok]} % TODO view filter
+ {Value, Results0} = case {View, Filter} of
+ {_, {fast_view, _, _, _}} ->
+ fast_view_filter(Db, Value0, Filter);
+ {#mrview{}, _} ->
+ {Value0, view_filter(Db, Value0, Filter)};
+ {_, _} ->
+ {Value0, filter(Db, Value0, Filter)}
end,
Results = [Result || Result <- Results0, Result /= null],
+ Seq = case Value of
+ #doc_info{} ->
+ Value#doc_info.high_seq;
+ {{Seq0, _}, _} ->
+ Seq0
+ end,
Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
case Results of
[] ->
@@ -576,20 +607,32 @@ changes_enumerator(Value, Acc) ->
{Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
end;
_ ->
- ChangesRow = changes_row(Results, Value, Acc),
- UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
- reset_heartbeat(),
- {Go, Acc#changes_acc{
- seq = Seq, prepend = <<",\n">>,
- user_acc = UserAcc2, limit = Limit - 1}}
+ if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
+ ChangesRow = changes_row(Results, Value, Acc),
+ UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}};
+ true ->
+ ChangesRow = changes_row(Results, Value, Acc),
+ UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+ reset_heartbeat(),
+ {Go, Acc#changes_acc{
+ seq = Seq, prepend = <<",\n">>,
+ user_acc = UserAcc2, limit = Limit - 1}}
+ end
end.
-changes_row(Results, SeqStuff, #changes_acc{view=#mrview{}}) ->
- {{Seq, Key}, {Id, Value}} = SeqStuff,
+changes_row(Results, DocInfo, #changes_acc{filter={fast_view,_,_,_}}=Acc) ->
+ format_doc_info_change(Results, DocInfo, Acc);
+changes_row(Results, KV, #changes_acc{view=#mrview{}}) ->
+ {{Seq, Key}, {Id, Value}} = KV,
{[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"key">>, Key}, {<<"value">>, Value}, {<<"changes">>, Results}]};
-changes_row(Results, #doc_info{}=DocInfo, #changes_acc{view=undefined}=Acc) ->
+changes_row(Results, #doc_info{}=DocInfo, Acc) ->
+ format_doc_info_change(Results, DocInfo, Acc).
+
+format_doc_info_change(Results, #doc_info{}=DocInfo, Acc) ->
#doc_info{
id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]
} = DocInfo,
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/20e585dd/src/couch_httpd_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_httpd_changes.erl b/src/couch_httpd_changes.erl
index 9f141d9..4963a5f 100644
--- a/src/couch_httpd_changes.erl
+++ b/src/couch_httpd_changes.erl
@@ -14,19 +14,13 @@
-export([handle_db_changes_req/2,
handle_changes_req/4,
- handle_view_filtered_changes/3,
- parse_changes_query/3]).
+ parse_changes_query/2]).
-include_lib("couch/include/couch_db.hrl").
handle_db_changes_req(Req, Db) ->
- ChangesArgs = parse_changes_query(Req, Db, false),
- ChangesFun = case ChangesArgs#changes_args.filter of
- "_view" ->
- handle_view_filtered_changes(ChangesArgs, Req, Db);
- _ ->
- couch_changes:handle_db_changes(ChangesArgs, Req, Db)
- end,
+ ChangesArgs = parse_changes_query(Req, Db),
+ ChangesFun = couch_changes:handle_db_changes(ChangesArgs, Req, Db),
handle_changes_req(Req, Db, ChangesArgs, ChangesFun).
handle_changes_req(#httpd{method='POST'}=Req, Db, ChangesArgs, ChangesFun) ->
@@ -126,170 +120,7 @@ handle_changes_req1(Req, #db{name=DbName}=Db, ChangesArgs, ChangesFun) ->
end.
-%% wrapper around couch_mrview_changes.
-%% This wrapper mimic couch_changes:handle_db_changes/3 and return a
-%% Changefun that can be used by the handle_changes_req function. Also
-%% while couch_mrview_changes:handle_changes/6 is returning tha view
-%% changes this function return docs corresponding to the changes
-%% instead so it can be used to replace the _view filter.
-handle_view_filtered_changes(ChangesArgs, Req, Db) ->
- %% parse view parameter
- {DDocId, VName} = parse_view_param(Req),
-
- %% get view options
- Query = case Req of
- {json_req, {Props}} ->
- {Q} = couch_util:get_value(<<"query">>, Props, {[]}),
- Q;
- _ ->
- couch_httpd:qs(Req)
- end,
- ViewOptions = parse_view_options(Query, []),
-
- {ok, Infos} = couch_mrview:get_info(Db, DDocId),
- case lists:member(<<"seq_indexed">>,
- proplists:get_value(update_options, Infos, [])) of
- true ->
- handle_view_filtered_changes(Db, DDocId, VName, ViewOptions, ChangesArgs,
- Req);
- false when ViewOptions /= [] ->
- ?LOG_ERROR("Tried to filter a non sequence indexed view~n",[]),
- throw({bad_request, seqs_not_indexed});
- false ->
- %% old method we are getting changes using the btree instead
- %% which is not efficient, log it
- ?LOG_WARN("Get view changes with seq_indexed=false.~n", []),
- couch_changes:handle_db_changes(ChangesArgs, Req, Db)
- end.
-
-handle_view_filtered_changes(#db{name=DbName}=Db0, DDocId, VName, ViewOptions,
- ChangesArgs, Req) ->
- #changes_args{
- feed = ResponseType,
- since = Since,
- db_open_options = DbOptions} = ChangesArgs,
-
- Options0 = [{since, Since},
- {view_options, ViewOptions}],
- Options = case ResponseType of
- "continuous" -> [stream | Options0];
- "eventsource" -> [stream | Options0];
- "longpoll" -> [{stream, once} | Options0];
- _ -> Options0
- end,
-
- %% reopen the db with the db options given to the changes args
- couch_db:close(Db0),
- DbOptions1 = [{user_ctx, Db0#db.user_ctx} | DbOptions],
- {ok, Db} = couch_db:open(DbName, DbOptions1),
-
-
- %% initialise the changes fun
- ChangesFun = fun(Callback) ->
- Callback(start, ResponseType),
-
- Acc0 = {"", 0, Db, Callback, ChangesArgs},
- couch_mrview_changes:handle_changes(DbName, DDocId, VName,
- fun view_changes_cb/2,
- Acc0, Options)
- end,
- ChangesFun.
-
-
-view_changes_cb(stop, {LastSeq, {_, _, _, Callback, Args}}) ->
- Callback({stop, LastSeq}, Args#changes_args.feed);
-
-view_changes_cb(heartbeat, {_, _, _, Callback, Args}=Acc) ->
- Callback(timeout, Args#changes_args.feed),
- {ok, Acc};
-view_changes_cb({{Seq, _Key, DocId}, Val},
- {Prepend, OldLimit, Db0, Callback, Args}=Acc) ->
-
- %% is the key removed from the index?
- Removed = case Val of
- {[{<<"_removed">>, true}]} -> true;
- _ -> false
- end,
-
- #changes_args{
- feed = ResponseType,
- limit = Limit} = Args,
-
- %% if the doc sequence is > to the one in the db record, reopen the
- %% database since it means we don't have the latest db value.
- Db = case Db0#db.update_seq >= Seq of
- true -> Db0;
- false ->
- {ok, Db1} = couch_db:reopen_db(Db0),
- Db1
- end,
-
- case couch_db:get_doc_info(Db, DocId) of
- {ok, DocInfo} ->
- %% get change row
- {Deleted, ChangeRow} = view_change_row(Db, DocInfo, Args),
-
- case Removed of
- true when Deleted /= true ->
- %% the key has been removed from the view but the
- %% document hasn't been deleted so ignore it.
- {ok, Acc};
- _ ->
- %% emit change row
- Callback({change, ChangeRow, Prepend}, ResponseType),
-
- %% if we achieved the limit, stop here, else continue.
- NewLimit = OldLimit + 1,
- if Limit > NewLimit ->
- {ok, {<<",\n">>, NewLimit, Db, Callback, Args}};
- true ->
- {stop, {<<"">>, NewLimit, Db, Callback, Args}}
- end
- end;
- {error, not_found} ->
- %% doc not found, continue
- {ok, Acc};
- Error ->
- throw(Error)
- end.
-
-
-view_change_row(Db, DocInfo, Args) ->
- #doc_info{id = Id, high_seq = Seq, revs = Revs} = DocInfo,
- [#rev_info{rev=Rev, deleted=Del} | _] = Revs,
-
- #changes_args{style=Style,
- include_docs=InDoc,
- doc_options = DocOpts,
- conflicts=Conflicts}=Args,
-
- Changes = case Style of
- main_only ->
- [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
- all_docs ->
- [{[{<<"rev">>, couch_doc:rev_to_str(R)}]}
- || #rev_info{rev=R} <- Revs]
- end,
-
- {Del, {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Changes}] ++
- deleted_item(Del) ++ case InDoc of
- true ->
- Opts = case Conflicts of
- true -> [deleted, conflicts];
- false -> [deleted]
- end,
- Doc = couch_index_util:load_doc(Db, DocInfo, Opts),
- case Doc of
- null ->
- [{doc, null}];
- _ ->
- [{doc, couch_doc:to_json_obj(Doc, DocOpts)}]
- end;
- false ->
- []
- end}}.
-
-parse_changes_query(Req, Db, IsViewChanges) ->
+parse_changes_query(Req, Db) ->
ChangesArgs = lists:foldl(fun({Key, Value}, Args) ->
case {string:to_lower(Key), Value} of
{"feed", _} ->
@@ -426,6 +257,3 @@ parse_json(V) when is_list(V) ->
?JSON_DECODE(V);
parse_json(V) ->
V.
-
-deleted_item(true) -> [{<<"deleted">>, true}];
-deleted_item(_) -> [].