You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/12/04 21:09:35 UTC
[04/22] couch commit: updated
refs/heads/2491-refactor-couch-httpd-auth to 3e8286d
add supports of view changes in the _changes API
Now when the option `seq_indexed=true` is set in the design doc, the
view filter in _changes will use it to retrieve the results. Compared to
the current way, using a view index will be faster to retrieve changes.
It also gives the possibility to filter changes by key or get changes in
a key range. All the view options can be used.
Note 1: if someone is trying to filter a changes with view options when
the views are not indexed by sequence, a 400 error will be returned.
Note 2: The changes will only be returned when the view is updated if
seq_indexed=true
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/7f2af21e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/7f2af21e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/7f2af21e
Branch: refs/heads/2491-refactor-couch-httpd-auth
Commit: 7f2af21e573abcd80b8c8412332d6439b4a777b3
Parents: fcb2882
Author: benoitc <bc...@gmail.com>
Authored: Fri Feb 7 15:38:34 2014 +0100
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Fri Oct 31 12:43:52 2014 -0700
----------------------------------------------------------------------
src/couch_httpd_changes.erl | 250 ++++++++++++++++++++++++++++++++++++++-
1 file changed, 246 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7f2af21e/src/couch_httpd_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_httpd_changes.erl b/src/couch_httpd_changes.erl
index 1e431e9..56ce559 100644
--- a/src/couch_httpd_changes.erl
+++ b/src/couch_httpd_changes.erl
@@ -12,7 +12,9 @@
-module(couch_httpd_changes).
--export([handle_changes_req/2]).
+-export([handle_changes_req/2,
+ handle_changes/3,
+ handle_view_changes/3]).
-include_lib("couch/include/couch_db.hrl").
@@ -34,9 +36,7 @@ handle_changes_req1(Req, #db{name=DbName}=Db) ->
% on other databases, _changes is free for all.
ok
end,
- handle_changes_req2(Req, Db).
-handle_changes_req2(Req, Db) ->
MakeCallback = fun(Resp) ->
fun({change, {ChangeProp}=Change, _}, "eventsource") ->
Seq = proplists:get_value(<<"seq">>, ChangeProp),
@@ -72,7 +72,7 @@ handle_changes_req2(Req, Db) ->
end
end,
ChangesArgs = parse_changes_query(Req, Db),
- ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db),
+ ChangesFun = handle_changes(ChangesArgs, Req, Db),
WrapperFun = case ChangesArgs#changes_args.feed of
"normal" ->
{ok, Info} = couch_db:get_db_info(Db),
@@ -116,6 +116,164 @@ handle_changes_req2(Req, Db) ->
)
end.
+
+handle_changes(ChangesArgs, Req, Db) ->
+ case ChangesArgs#changes_args.filter of
+ "_view" ->
+ handle_view_changes(ChangesArgs, Req, Db);
+ _ ->
+ couch_changes:handle_changes(ChangesArgs, Req, Db)
+ end.
+
+%% wrapper around couch_mrview_changes.
+%% This wrapper mimic couch_changes:handle_changes/3 and return a
+%% Changefun that can be used by the handle_changes_req function. Also
+%% while couch_mrview_changes:handle_changes/6 is returning tha view
+%% changes this function return docs corresponding to the changes
+%% instead so it can be used to replace the _view filter.
+handle_view_changes(ChangesArgs, Req, Db) ->
+ %% parse view parameter
+ {DDocId, VName} = parse_view_param(Req),
+
+ %% get view options
+ Query = case Req of
+ {json_req, {Props}} ->
+ {Q} = couch_util:get_value(<<"query">>, Props, {[]}),
+ Q;
+ _ ->
+ couch_httpd:qs(Req)
+ end,
+ ViewOptions = parse_view_options(Query, []),
+
+ {ok, Infos} = couch_mrview:get_info(Db, DDocId),
+ case lists:member(<<"seq_indexed">>,
+ proplists:get_value(update_options, Infos, [])) of
+ true ->
+ handle_view_changes(Db, DDocId, VName, ViewOptions, ChangesArgs,
+ Req);
+ false when ViewOptions /= [] ->
+ ?LOG_ERROR("Tried to filter a non sequence indexed view~n",[]),
+ throw({bad_request, seqs_not_indexed});
+ false ->
+ %% old method we are getting changes using the btree instead
+ %% which is not efficient, log it
+ ?LOG_WARN("Get view changes with seq_indexed=false.~n", []),
+ couch_changes:handle_changes(ChangesArgs, Req, Db)
+ end.
+
+handle_view_changes(#db{name=DbName}=Db0, DDocId, VName, ViewOptions,
+ ChangesArgs, Req) ->
+ #changes_args{
+ feed = ResponseType,
+ since = Since,
+ db_open_options = DbOptions} = ChangesArgs,
+
+ Options0 = [{since, Since},
+ {view_options, ViewOptions}],
+ Options = case ResponseType of
+ "continuous" -> [stream | Options0];
+ "eventsource" -> [stream | Options0];
+ "longpoll" -> [{stream, once} | Options0];
+ _ -> Options0
+ end,
+
+ %% reopen the db with the db options given to the changes args
+ couch_db:close(Db0),
+ DbOptions1 = [{user_ctx, Db0#db.user_ctx} | DbOptions],
+ {ok, Db} = couch_db:open(DbName, DbOptions1),
+
+
+ %% initialise the changes fun
+ ChangesFun = fun(Callback) ->
+ Callback(start, ResponseType),
+
+ Acc0 = {"", 0, Db, Callback, ChangesArgs},
+ couch_mrview_changes:handle_changes(DbName, DDocId, VName,
+ fun view_changes_cb/2,
+ Acc0, Options)
+ end,
+ ChangesFun.
+
+
+view_changes_cb(stop, {LastSeq, {_, _, _, Callback, Args}}) ->
+ Callback({stop, LastSeq}, Args#changes_args.feed);
+
+view_changes_cb(heartbeat, {_, _, _, Callback, Args}=Acc) ->
+ Callback(timeout, Args#changes_args.feed),
+ {ok, Acc};
+view_changes_cb({{Seq, _Key, DocId}, _VAl},
+ {Prepend, OldLimit, Db0, Callback, Args}=Acc) ->
+
+ #changes_args{
+ feed = ResponseType,
+ limit = Limit} = Args,
+
+ %% if the doc sequence is > to the one in the db record, reopen the
+ %% database since it means we don't have the latest db value.
+ Db = case Db0#db.update_seq >= Seq of
+ true -> Db0;
+ false ->
+ {ok, Db1} = couch_db:reopen_db(Db0),
+ Db1
+ end,
+
+ case couch_db:get_doc_info(Db, DocId) of
+ {ok, DocInfo} ->
+ %% get change row
+ ChangeRow = view_change_row(Db, DocInfo, Args),
+ %% emit change row
+ Callback({change, ChangeRow, Prepend}, ResponseType),
+
+ %% if we achieved the limit, stop here, else continue.
+ NewLimit = OldLimit + 1,
+ if Limit > NewLimit ->
+ {ok, {<<",\n">>, Db, NewLimit, Callback, Args}};
+ true ->
+ {stop, {<<"">>, Db, NewLimit, Callback, Args}}
+ end;
+ {error, not_found} ->
+ %% doc not found, continue
+ {ok, Acc};
+ Error ->
+ throw(Error)
+ end.
+
+
+view_change_row(Db, DocInfo, Args) ->
+ #doc_info{id = Id, high_seq = Seq, revs = Revs} = DocInfo,
+ [#rev_info{rev=Rev, deleted=Del} | _] = Revs,
+
+ #changes_args{style=Style,
+ include_docs=InDoc,
+ doc_options = DocOpts,
+ conflicts=Conflicts}=Args,
+
+ Changes = case Style of
+ main_only ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+ all_docs ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(R)}]}
+ || #rev_info{rev=R} <- Revs]
+ end,
+
+ {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Changes}] ++
+ deleted_item(Del) ++ case InDoc of
+ true ->
+ Opts = case Conflicts of
+ true -> [deleted, conflicts];
+ false -> [deleted]
+ end,
+ Doc = couch_index_util:load_doc(Db, DocInfo, Opts),
+ case Doc of
+ null ->
+ [{doc, null}];
+ _ ->
+ [{doc, couch_doc:to_json_obj(Doc, DocOpts)}]
+ end;
+ false ->
+ []
+ end}.
+
parse_changes_query(Req, Db) ->
ChangesArgs = lists:foldl(fun({Key, Value}, Args) ->
case {string:to_lower(Key), Value} of
@@ -172,3 +330,87 @@ parse_changes_query(Req, Db) ->
_ ->
ChangesArgs
end.
+
+parse_view_param({json_req, {Props}}) ->
+ {Query} = couch_util:get_value(<<"query">>, Props),
+ parse_view_param1(couch_util:get_value(<<"view">>, Query, <<"">>));
+parse_view_param(Req) ->
+ parse_view_param1(list_to_binary(couch_httpd:qs_value(Req, "view", ""))).
+
+parse_view_param1(ViewParam) ->
+ case re:split(ViewParam, <<"/">>) of
+ [DName, ViewName] ->
+ {<< "_design/", DName/binary >>, ViewName};
+ _ ->
+ throw({bad_request, "Invalid `view` parameter."})
+ end.
+
+parse_view_options([], Acc) ->
+ Acc;
+parse_view_options([{K, V} | Rest], Acc) ->
+ Acc1 = case couch_util:to_binary(K) of
+ <<"reduce">> ->
+ [{reduce, couch_mrview_http:parse_boolean(V)}];
+ <<"key">> ->
+ V1 = parse_json(V),
+ [{start_key, V1}, {end_key, V1} | Acc];
+ <<"keys">> ->
+ [{keys, parse_json(V)} | Acc];
+ <<"startkey">> ->
+ [{start_key, parse_json(V)} | Acc];
+ <<"start_key">> ->
+ [{start_key, parse_json(V)} | Acc];
+ <<"startkey_docid">> ->
+ [{start_key_docid, couch_util:to_binary(V)} | Acc];
+ <<"start_key_docid">> ->
+ [{start_key_docid, couch_util:to_binary(V)} | Acc];
+ <<"endkey">> ->
+ [{end_key, parse_json(V)} | Acc];
+ <<"end_key">> ->
+ [{end_key, parse_json(V)} | Acc];
+ <<"endkey_docid">> ->
+ [{start_key_docid, couch_util:to_binary(V)} | Acc];
+ <<"end_key_docid">> ->
+ [{start_key_docid, couch_util:to_binary(V)} | Acc];
+ <<"limit">> ->
+ [{limit, couch_mrview_http:parse_pos_int(V)} | Acc];
+ <<"count">> ->
+ throw({query_parse_error, <<"QS param `count` is not `limit`">>});
+ <<"stale">> when V =:= <<"ok">> orelse V =:= "ok" ->
+ [{stale, ok} | Acc];
+ <<"stale">> when V =:= <<"update_after">> orelse V =:= "update_after" ->
+ [{stale, update_after} | Acc];
+ <<"stale">> ->
+ throw({query_parse_error, <<"Invalid value for `stale`.">>});
+ <<"descending">> ->
+ case couch_mrview_http:parse_boolean(V) of
+ true ->
+ [{direction, rev} | Acc];
+ _ ->
+ [{direction, fwd} | Acc]
+ end;
+ <<"skip">> ->
+ [{skip, couch_mrview_http:parse_pos_int(V)} | Acc];
+ <<"group">> ->
+ case couch_mrview_http:parse_booolean(V) of
+ true ->
+ [{group_level, exact} | Acc];
+ _ ->
+ [{group_level, 0} | Acc]
+ end;
+ <<"group_level">> ->
+ [{group_level, couch_mrview_http:parse_pos_int(V)} | Acc];
+ <<"inclusive_end">> ->
+ [{inclusive_end, couch_mrview_http:parse_boolean(V)}];
+ _ ->
+ Acc
+ end,
+ parse_view_options(Rest, Acc1).
+
+parse_json(V) when is_list(V) ->
+ ?JSON_DECODE(V);
+parse_json(V) ->
+ V.
+
+deleted_item(true) -> [{<<"deleted">>, true}];
+deleted_item(_) -> [].