You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/12/04 21:09:35 UTC

[04/22] couch commit: updated refs/heads/2491-refactor-couch-httpd-auth to 3e8286d

add supports of view changes in the _changes API

Now when the option `seq_indexed=true` is set in the design doc, the
view filter in _changes will use it to retrieve the results. Compared to
the current way, using a view index will be faster to retrieve changes.
It also gives the possibility to filter changes by key or get changes in
a key range. All the view options can be used.

Note 1: if someone is trying to filter a changes with view options when
the views are not indexed by sequence, a 400 error will be returned.
Note 2: The changes will only be returned when the view is updated if
seq_indexed=true


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/7f2af21e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/7f2af21e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/7f2af21e

Branch: refs/heads/2491-refactor-couch-httpd-auth
Commit: 7f2af21e573abcd80b8c8412332d6439b4a777b3
Parents: fcb2882
Author: benoitc <bc...@gmail.com>
Authored: Fri Feb 7 15:38:34 2014 +0100
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Fri Oct 31 12:43:52 2014 -0700

----------------------------------------------------------------------
 src/couch_httpd_changes.erl | 250 ++++++++++++++++++++++++++++++++++++++-
 1 file changed, 246 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/7f2af21e/src/couch_httpd_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_httpd_changes.erl b/src/couch_httpd_changes.erl
index 1e431e9..56ce559 100644
--- a/src/couch_httpd_changes.erl
+++ b/src/couch_httpd_changes.erl
@@ -12,7 +12,9 @@
 
 -module(couch_httpd_changes).
 
--export([handle_changes_req/2]).
+-export([handle_changes_req/2,
+         handle_changes/3,
+         handle_view_changes/3]).
 
 -include_lib("couch/include/couch_db.hrl").
 
@@ -34,9 +36,7 @@ handle_changes_req1(Req, #db{name=DbName}=Db) ->
         % on other databases, _changes is free for all.
         ok
     end,
-    handle_changes_req2(Req, Db).
 
-handle_changes_req2(Req, Db) ->
     MakeCallback = fun(Resp) ->
         fun({change, {ChangeProp}=Change, _}, "eventsource") ->
             Seq = proplists:get_value(<<"seq">>, ChangeProp),
@@ -72,7 +72,7 @@ handle_changes_req2(Req, Db) ->
         end
     end,
     ChangesArgs = parse_changes_query(Req, Db),
-    ChangesFun = couch_changes:handle_changes(ChangesArgs, Req, Db),
+    ChangesFun = handle_changes(ChangesArgs, Req, Db),
     WrapperFun = case ChangesArgs#changes_args.feed of
     "normal" ->
         {ok, Info} = couch_db:get_db_info(Db),
@@ -116,6 +116,164 @@ handle_changes_req2(Req, Db) ->
     )
     end.
 
+
+handle_changes(ChangesArgs, Req, Db) ->
+    case ChangesArgs#changes_args.filter of
+        "_view" ->
+            handle_view_changes(ChangesArgs, Req, Db);
+        _ ->
+            couch_changes:handle_changes(ChangesArgs, Req, Db)
+    end.
+
+%% wrapper around couch_mrview_changes.
+%% This wrapper mimic couch_changes:handle_changes/3 and return a
+%% Changefun that can be used by the handle_changes_req function. Also
+%% while couch_mrview_changes:handle_changes/6 is returning tha view
+%% changes this function return docs corresponding to the changes
+%% instead so it can be used to replace the _view filter.
+handle_view_changes(ChangesArgs, Req, Db) ->
+    %% parse view parameter
+    {DDocId, VName} = parse_view_param(Req),
+
+    %% get view options
+    Query = case Req of
+        {json_req, {Props}} ->
+            {Q} = couch_util:get_value(<<"query">>, Props, {[]}),
+            Q;
+        _ ->
+            couch_httpd:qs(Req)
+    end,
+    ViewOptions = parse_view_options(Query, []),
+
+    {ok, Infos} = couch_mrview:get_info(Db, DDocId),
+    case lists:member(<<"seq_indexed">>,
+                      proplists:get_value(update_options, Infos, [])) of
+        true ->
+            handle_view_changes(Db, DDocId, VName, ViewOptions, ChangesArgs,
+                                Req);
+        false when ViewOptions /= [] ->
+            ?LOG_ERROR("Tried to filter a non sequence indexed view~n",[]),
+            throw({bad_request, seqs_not_indexed});
+        false ->
+            %% old method we are getting changes using the btree instead
+            %% which is not efficient, log it
+            ?LOG_WARN("Get view changes with seq_indexed=false.~n", []),
+            couch_changes:handle_changes(ChangesArgs, Req, Db)
+    end.
+
+handle_view_changes(#db{name=DbName}=Db0, DDocId, VName, ViewOptions,
+                    ChangesArgs, Req) ->
+    #changes_args{
+        feed = ResponseType,
+        since = Since,
+        db_open_options = DbOptions} = ChangesArgs,
+
+    Options0 = [{since, Since},
+                {view_options, ViewOptions}],
+    Options = case ResponseType of
+        "continuous" -> [stream | Options0];
+        "eventsource" -> [stream | Options0];
+        "longpoll" -> [{stream, once} | Options0];
+        _ -> Options0
+    end,
+
+    %% reopen the db with the db options given to the changes args
+    couch_db:close(Db0),
+    DbOptions1 = [{user_ctx, Db0#db.user_ctx} | DbOptions],
+    {ok, Db} = couch_db:open(DbName, DbOptions1),
+
+
+    %% initialise the changes fun
+    ChangesFun = fun(Callback) ->
+            Callback(start, ResponseType),
+
+            Acc0 = {"", 0, Db, Callback, ChangesArgs},
+            couch_mrview_changes:handle_changes(DbName, DDocId, VName,
+                                               fun view_changes_cb/2,
+                                               Acc0, Options)
+    end,
+    ChangesFun.
+
+
+view_changes_cb(stop, {LastSeq, {_, _, _, Callback, Args}}) ->
+    Callback({stop, LastSeq}, Args#changes_args.feed);
+
+view_changes_cb(heartbeat, {_, _, _, Callback, Args}=Acc) ->
+    Callback(timeout, Args#changes_args.feed),
+    {ok, Acc};
+view_changes_cb({{Seq, _Key, DocId}, _VAl},
+                {Prepend, OldLimit, Db0, Callback, Args}=Acc) ->
+
+    #changes_args{
+        feed = ResponseType,
+        limit = Limit} = Args,
+
+    %% if the doc sequence is > to the one in the db record, reopen the
+    %% database since it means we don't have the latest db value.
+    Db = case Db0#db.update_seq >= Seq of
+        true -> Db0;
+        false ->
+            {ok, Db1} = couch_db:reopen_db(Db0),
+            Db1
+    end,
+
+    case couch_db:get_doc_info(Db, DocId) of
+        {ok, DocInfo} ->
+            %% get change row
+            ChangeRow = view_change_row(Db, DocInfo, Args),
+            %% emit change row
+            Callback({change, ChangeRow, Prepend}, ResponseType),
+
+            %% if we achieved the limit, stop here, else continue.
+            NewLimit = OldLimit + 1,
+            if Limit > NewLimit ->
+                    {ok, {<<",\n">>, Db, NewLimit, Callback, Args}};
+                true ->
+                    {stop, {<<"">>, Db, NewLimit, Callback, Args}}
+            end;
+        {error, not_found} ->
+            %% doc not found, continue
+            {ok, Acc};
+        Error ->
+            throw(Error)
+    end.
+
+
+view_change_row(Db, DocInfo, Args) ->
+    #doc_info{id = Id, high_seq = Seq, revs = Revs} = DocInfo,
+    [#rev_info{rev=Rev, deleted=Del} | _] = Revs,
+
+    #changes_args{style=Style,
+                  include_docs=InDoc,
+                  doc_options = DocOpts,
+                  conflicts=Conflicts}=Args,
+
+    Changes = case Style of
+        main_only ->
+            [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+        all_docs ->
+            [{[{<<"rev">>, couch_doc:rev_to_str(R)}]}
+                || #rev_info{rev=R} <- Revs]
+    end,
+
+    {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Changes}] ++
+     deleted_item(Del) ++ case InDoc of
+            true ->
+                Opts = case Conflicts of
+                    true -> [deleted, conflicts];
+                    false -> [deleted]
+                end,
+                Doc = couch_index_util:load_doc(Db, DocInfo, Opts),
+                case Doc of
+                    null ->
+                        [{doc, null}];
+                    _ ->
+                        [{doc, couch_doc:to_json_obj(Doc, DocOpts)}]
+                end;
+            false ->
+                []
+        end}.
+
 parse_changes_query(Req, Db) ->
     ChangesArgs = lists:foldl(fun({Key, Value}, Args) ->
         case {string:to_lower(Key), Value} of
@@ -172,3 +330,87 @@ parse_changes_query(Req, Db) ->
         _ ->
             ChangesArgs
     end.
+
+parse_view_param({json_req, {Props}}) ->
+    {Query} = couch_util:get_value(<<"query">>, Props),
+    parse_view_param1(couch_util:get_value(<<"view">>, Query, <<"">>));
+parse_view_param(Req) ->
+    parse_view_param1(list_to_binary(couch_httpd:qs_value(Req, "view", ""))).
+
+parse_view_param1(ViewParam) ->
+    case re:split(ViewParam, <<"/">>) of
+        [DName, ViewName] ->
+            {<< "_design/", DName/binary >>, ViewName};
+        _ ->
+            throw({bad_request, "Invalid `view` parameter."})
+    end.
+
+parse_view_options([], Acc) ->
+    Acc;
+parse_view_options([{K, V} | Rest], Acc) ->
+    Acc1 = case couch_util:to_binary(K) of
+        <<"reduce">> ->
+            [{reduce, couch_mrview_http:parse_boolean(V)}];
+        <<"key">> ->
+            V1 = parse_json(V),
+            [{start_key, V1}, {end_key, V1} | Acc];
+        <<"keys">> ->
+            [{keys, parse_json(V)} | Acc];
+        <<"startkey">> ->
+            [{start_key, parse_json(V)} | Acc];
+        <<"start_key">> ->
+            [{start_key, parse_json(V)} | Acc];
+        <<"startkey_docid">> ->
+            [{start_key_docid, couch_util:to_binary(V)} | Acc];
+        <<"start_key_docid">> ->
+            [{start_key_docid, couch_util:to_binary(V)} | Acc];
+        <<"endkey">> ->
+            [{end_key, parse_json(V)} | Acc];
+        <<"end_key">> ->
+            [{end_key, parse_json(V)} | Acc];
+        <<"endkey_docid">> ->
+            [{start_key_docid, couch_util:to_binary(V)} | Acc];
+        <<"end_key_docid">> ->
+            [{start_key_docid, couch_util:to_binary(V)} | Acc];
+        <<"limit">> ->
+            [{limit, couch_mrview_http:parse_pos_int(V)} | Acc];
+        <<"count">> ->
+            throw({query_parse_error, <<"QS param `count` is not `limit`">>});
+        <<"stale">> when V =:= <<"ok">> orelse V =:= "ok" ->
+            [{stale, ok} | Acc];
+        <<"stale">> when V =:= <<"update_after">> orelse V =:= "update_after" ->
+            [{stale, update_after} | Acc];
+        <<"stale">> ->
+            throw({query_parse_error, <<"Invalid value for `stale`.">>});
+        <<"descending">> ->
+            case couch_mrview_http:parse_boolean(V) of
+                true ->
+                    [{direction, rev} | Acc];
+                _ ->
+                    [{direction, fwd} | Acc]
+            end;
+        <<"skip">> ->
+            [{skip, couch_mrview_http:parse_pos_int(V)} | Acc];
+        <<"group">> ->
+            case couch_mrview_http:parse_booolean(V) of
+                true ->
+                    [{group_level, exact} | Acc];
+                _ ->
+                    [{group_level, 0} | Acc]
+            end;
+        <<"group_level">> ->
+            [{group_level, couch_mrview_http:parse_pos_int(V)} | Acc];
+        <<"inclusive_end">> ->
+            [{inclusive_end, couch_mrview_http:parse_boolean(V)}];
+        _ ->
+            Acc
+    end,
+    parse_view_options(Rest, Acc1).
+
+parse_json(V) when is_list(V) ->
+    ?JSON_DECODE(V);
+parse_json(V) ->
+    V.
+
+deleted_item(true) -> [{<<"deleted">>, true}];
+deleted_item(_) -> [].