You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/12/04 21:09:39 UTC

[08/22] couch commit: updated refs/heads/2491-refactor-couch-httpd-auth to 3e8286d

Add view filtering optimization to changes feeds


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/20e585dd
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/20e585dd
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/20e585dd

Branch: refs/heads/2491-refactor-couch-httpd-auth
Commit: 20e585dd07e4da0fe67ace09f0a291e3fe759534
Parents: 6d6b801
Author: Benjamin Bastian <be...@gmail.com>
Authored: Fri Aug 22 23:56:56 2014 +0700
Committer: Benjamin Bastian <be...@gmail.com>
Committed: Fri Oct 31 12:43:53 2014 -0700

----------------------------------------------------------------------
 src/couch_changes.erl       | 181 ++++++++++++++++++++++++---------------
 src/couch_httpd_changes.erl | 180 +-------------------------------------
 2 files changed, 116 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/20e585dd/src/couch_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_changes.erl b/src/couch_changes.erl
index abe8cf9..259f83c 100644
--- a/src/couch_changes.erl
+++ b/src/couch_changes.erl
@@ -63,25 +63,45 @@ handle_changes(Args1, Req, Db0, Type) ->
         dir = Dir,
         since = Since
     } = Args1,
-    {StartListenerFun, DDocName, ViewName, View} = case Type of
-        {view, DDocName0, ViewName0} ->
-            SNFun = fun() ->
-                couch_event:link_listener(
-                     ?MODULE, handle_view_event, self(), [{dbname, Db0#db.name}]
-                )
-            end,
-            {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view(Db0#db.name, DDocName0, ViewName0, #mrargs{}),
-            {SNFun, DDocName0, ViewName0, View0};
-        db ->
-            SNFun = fun() ->
-                couch_event:link_listener(
-                     ?MODULE, handle_db_event, self(), [{dbname, Db0#db.name}]
-                )
-            end,
-            {SNFun, undefined, undefined, undefined}
-    end,
     Filter = configure_filter(FilterName, Style, Req, Db0),
     Args = Args1#changes_args{filter_fun = Filter},
+    UseViewChanges = case {Type, Filter} of
+        {{view, _, _}, _} ->
+            true;
+        {_, {fast_view, _, _, _}} ->
+            true;
+        _ ->
+            false
+    end,
+    {StartListenerFun, DDocName, ViewName, View} = if UseViewChanges ->
+        {DDocName0, ViewName0} = case {Type, Filter} of
+            {{view, DDocName1, ViewName1}, _} ->
+                {DDocName1, ViewName1};
+            {_, {fast_view, _, DDoc, ViewName1}} ->
+                {DDoc#doc.id, ViewName1}
+        end,
+        {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view(
+                Db0#db.name, DDocName0, ViewName0, #mrargs{}),
+        case View0#mrview.seq_btree of
+            #btree{} ->
+                ok;
+            _ ->
+                throw({bad_request, "view changes not enabled"})
+        end,
+        SNFun = fun() ->
+            couch_event:link_listener(
+                 ?MODULE, handle_view_event, {self(), DDocName0}, [{dbname, Db0#db.name}]
+            )
+        end,
+        {SNFun, DDocName0, ViewName0, View0};
+    true ->
+        SNFun = fun() ->
+            couch_event:link_listener(
+                 ?MODULE, handle_db_event, self(), [{dbname, Db0#db.name}]
+            )
+        end,
+        {SNFun, undefined, undefined, undefined}
+    end,
     Start = fun() ->
         {ok, Db} = couch_db:reopen(Db0),
         StartSeq = case Dir of
@@ -180,7 +200,15 @@ configure_filter("_view", Style, Req, Db) ->
         [DName, VName] ->
             {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
             check_member_exists(DDoc, [<<"views">>, VName]),
-            {view, Style, DDoc, VName};
+            try
+                true = couch_util:get_nested_json_value(
+                        DDoc#doc.body,
+                        [<<"options">>, <<"seq_indexed">>]
+                ),
+                {fast_view, Style, DDoc, VName}
+            catch _:_ ->
+                {view, Style, DDoc, VName}
+            end;
         [] ->
             Msg = "`view` must be of the form `designname/viewname`",
             throw({bad_request, Msg})
@@ -237,6 +265,36 @@ filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) ->
     {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
     filter_revs(Passes, Docs).
 
+fast_view_filter(Db, {{Seq, _}, {ID, _}}, {fast_view, Style, _, _}) ->
+    case couch_db:get_doc_info(Db, ID) of
+        {ok, #doc_info{high_seq=Seq}=DocInfo} ->
+            Docs = open_revs(Db, DocInfo, Style),
+            Changes = lists:map(fun(#doc{revs={RevPos, [RevId | _]}}) ->
+                RevStr = couch_doc:rev_to_str({RevPos, RevId}),
+                {[{<<"rev">>, RevStr}]}
+            end, Docs),
+            {DocInfo, Changes};
+        {ok, #doc_info{high_seq=HighSeq}} when Seq > HighSeq ->
+            % If the view seq tree is out of date (or if the view seq tree
+            % was opened before the db) seqs may come by from the seq tree
+            % which correspond to the not-most-current revision of a document.
+            % The proper thing to do is to not send this old revision, but wait
+            % until we reopen the up-to-date view seq tree and continue the
+            % fold.
+            % I left the Seq > HighSeq guard in so if (for some godforsaken
+            % reason) the seq in the view is more current than the database,
+            % we'll throw an error.
+            {ok, []};
+        {error, not_found} ->
+            {ok, []}
+    end.
+
+
+
+view_filter(_Db, _KV, {default, _Style}) ->
+    [ok]. % TODO: make a real thing
+
+
 get_view_qs({json_req, {Props}}) ->
     {Query} = couch_util:get_value(<<"query">>, Props, {[]}),
     binary_to_list(couch_util:get_value(<<"view">>, Query, ""));
@@ -477,7 +535,7 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
         db = Db, callback = Callback,
         timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq,
         prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit,
-        ddoc_name = DDocName, view_name = ViewName, view = View
+        ddoc_name = DDocName, view_name = ViewName
     } = ChangesAcc,
 
     couch_db:close(Db),
@@ -517,54 +575,27 @@ maybe_refresh_view(Db, DDocName, ViewName) ->
 end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
     Callback({stop, EndSeq}, ResponseType, UserAcc).
 
-changes_enumerator(Value, #changes_acc{resp_type = ResponseType} = Acc)
-        when ResponseType =:= "continuous"
-        orelse ResponseType =:= "eventsource" ->
-    #changes_acc{
-        filter = Filter, callback = Callback,
-        user_acc = UserAcc, limit = Limit, db = Db,
-        timeout = Timeout, timeout_fun = TimeoutFun,
-        view = View
-    } = Acc,
-    {Seq, Results0} = case View of
-        undefined ->
-            {Value#doc_info.high_seq, filter(Db, Value, Filter)};
-        #mrview{} ->
-            {{Seq0, _}, _} = Value,
-            {Seq0, [ok]} % TODO
-    end,
-    Results = [Result || Result <- Results0, Result /= null],
-    %% TODO: I'm thinking this should be < 1 and not =< 1
-    Go = if Limit =< 1 -> stop; true -> ok end,
-    case Results of
-    [] ->
-        {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
-        case Done of
-        stop ->
-            {stop, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}};
-        ok ->
-            {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
-        end;
-    _ ->
-        ChangesRow = changes_row(Results, Value, Acc),
-        UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
-        reset_heartbeat(),
-        {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}}
-    end;
-changes_enumerator(Value, Acc) ->
+changes_enumerator(Value0, Acc) ->
     #changes_acc{
         filter = Filter, callback = Callback, prepend = Prepend,
         user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
         timeout = Timeout, timeout_fun = TimeoutFun, view = View
     } = Acc,
-    {Seq, Results0} = case View of
-        undefined ->
-            {Value#doc_info.high_seq, filter(Db, Value, Filter)};
-        #mrview{} ->
-            {{Seq0,_}, _} = Value,
-            {Seq0, [ok]} % TODO view filter
+    {Value, Results0} = case {View, Filter} of
+        {_, {fast_view, _, _, _}} ->
+            fast_view_filter(Db, Value0, Filter);
+        {#mrview{}, _} ->
+            {Value0, view_filter(Db, Value0, Filter)};
+        {_, _} ->
+            {Value0, filter(Db, Value0, Filter)}
     end,
     Results = [Result || Result <- Results0, Result /= null],
+    Seq = case Value of
+        #doc_info{} ->
+            Value#doc_info.high_seq;
+        {{Seq0, _}, _} ->
+            Seq0
+    end,
     Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
     case Results of
     [] ->
@@ -576,20 +607,32 @@ changes_enumerator(Value, Acc) ->
             {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2}}
         end;
     _ ->
-        ChangesRow = changes_row(Results, Value, Acc),
-        UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
-        reset_heartbeat(),
-        {Go, Acc#changes_acc{
-            seq = Seq, prepend = <<",\n">>,
-            user_acc = UserAcc2, limit = Limit - 1}}
+        if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
+            ChangesRow = changes_row(Results, Value, Acc),
+            UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
+            reset_heartbeat(),
+            {Go, Acc#changes_acc{seq = Seq, user_acc = UserAcc2, limit = Limit - 1}};
+        true ->
+            ChangesRow = changes_row(Results, Value, Acc),
+            UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+            reset_heartbeat(),
+            {Go, Acc#changes_acc{
+                seq = Seq, prepend = <<",\n">>,
+                user_acc = UserAcc2, limit = Limit - 1}}
+        end
     end.
 
 
 
-changes_row(Results, SeqStuff, #changes_acc{view=#mrview{}}) ->
-    {{Seq, Key}, {Id, Value}} = SeqStuff,
+changes_row(Results, DocInfo, #changes_acc{filter={fast_view,_,_,_}}=Acc) ->
+    format_doc_info_change(Results, DocInfo, Acc);
+changes_row(Results, KV, #changes_acc{view=#mrview{}}) ->
+    {{Seq, Key}, {Id, Value}} = KV,
     {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"key">>, Key}, {<<"value">>, Value}, {<<"changes">>, Results}]};
-changes_row(Results, #doc_info{}=DocInfo, #changes_acc{view=undefined}=Acc) ->
+changes_row(Results, #doc_info{}=DocInfo, Acc) ->
+    format_doc_info_change(Results, DocInfo, Acc).
+
+format_doc_info_change(Results, #doc_info{}=DocInfo, Acc) ->
     #doc_info{
         id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]
     } = DocInfo,

http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/20e585dd/src/couch_httpd_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_httpd_changes.erl b/src/couch_httpd_changes.erl
index 9f141d9..4963a5f 100644
--- a/src/couch_httpd_changes.erl
+++ b/src/couch_httpd_changes.erl
@@ -14,19 +14,13 @@
 
 -export([handle_db_changes_req/2,
          handle_changes_req/4,
-         handle_view_filtered_changes/3,
-         parse_changes_query/3]).
+         parse_changes_query/2]).
 
 -include_lib("couch/include/couch_db.hrl").
 
 handle_db_changes_req(Req, Db) ->
-    ChangesArgs = parse_changes_query(Req, Db, false),
-    ChangesFun = case ChangesArgs#changes_args.filter of
-        "_view" ->
-            handle_view_filtered_changes(ChangesArgs, Req, Db);
-        _ ->
-            couch_changes:handle_db_changes(ChangesArgs, Req, Db)
-    end,
+    ChangesArgs = parse_changes_query(Req, Db),
+    ChangesFun = couch_changes:handle_db_changes(ChangesArgs, Req, Db),
     handle_changes_req(Req, Db, ChangesArgs, ChangesFun).
 
 handle_changes_req(#httpd{method='POST'}=Req, Db, ChangesArgs, ChangesFun) ->
@@ -126,170 +120,7 @@ handle_changes_req1(Req, #db{name=DbName}=Db, ChangesArgs, ChangesFun) ->
     end.
 
 
-%% wrapper around couch_mrview_changes.
-%% This wrapper mimic couch_changes:handle_db_changes/3 and return a
-%% Changefun that can be used by the handle_changes_req function. Also
-%% while couch_mrview_changes:handle_changes/6 is returning tha view
-%% changes this function return docs corresponding to the changes
-%% instead so it can be used to replace the _view filter.
-handle_view_filtered_changes(ChangesArgs, Req, Db) ->
-    %% parse view parameter
-    {DDocId, VName} = parse_view_param(Req),
-
-    %% get view options
-    Query = case Req of
-        {json_req, {Props}} ->
-            {Q} = couch_util:get_value(<<"query">>, Props, {[]}),
-            Q;
-        _ ->
-            couch_httpd:qs(Req)
-    end,
-    ViewOptions = parse_view_options(Query, []),
-
-    {ok, Infos} = couch_mrview:get_info(Db, DDocId),
-    case lists:member(<<"seq_indexed">>,
-                      proplists:get_value(update_options, Infos, [])) of
-        true ->
-            handle_view_filtered_changes(Db, DDocId, VName, ViewOptions, ChangesArgs,
-                                Req);
-        false when ViewOptions /= [] ->
-            ?LOG_ERROR("Tried to filter a non sequence indexed view~n",[]),
-            throw({bad_request, seqs_not_indexed});
-        false ->
-            %% old method we are getting changes using the btree instead
-            %% which is not efficient, log it
-            ?LOG_WARN("Get view changes with seq_indexed=false.~n", []),
-            couch_changes:handle_db_changes(ChangesArgs, Req, Db)
-    end.
-
-handle_view_filtered_changes(#db{name=DbName}=Db0, DDocId, VName, ViewOptions,
-                    ChangesArgs, Req) ->
-    #changes_args{
-        feed = ResponseType,
-        since = Since,
-        db_open_options = DbOptions} = ChangesArgs,
-
-    Options0 = [{since, Since},
-                {view_options, ViewOptions}],
-    Options = case ResponseType of
-        "continuous" -> [stream | Options0];
-        "eventsource" -> [stream | Options0];
-        "longpoll" -> [{stream, once} | Options0];
-        _ -> Options0
-    end,
-
-    %% reopen the db with the db options given to the changes args
-    couch_db:close(Db0),
-    DbOptions1 = [{user_ctx, Db0#db.user_ctx} | DbOptions],
-    {ok, Db} = couch_db:open(DbName, DbOptions1),
-
-
-    %% initialise the changes fun
-    ChangesFun = fun(Callback) ->
-            Callback(start, ResponseType),
-
-            Acc0 = {"", 0, Db, Callback, ChangesArgs},
-            couch_mrview_changes:handle_changes(DbName, DDocId, VName,
-                                               fun view_changes_cb/2,
-                                               Acc0, Options)
-    end,
-    ChangesFun.
-
-
-view_changes_cb(stop, {LastSeq, {_, _, _, Callback, Args}}) ->
-    Callback({stop, LastSeq}, Args#changes_args.feed);
-
-view_changes_cb(heartbeat, {_, _, _, Callback, Args}=Acc) ->
-    Callback(timeout, Args#changes_args.feed),
-    {ok, Acc};
-view_changes_cb({{Seq, _Key, DocId}, Val},
-                {Prepend, OldLimit, Db0, Callback, Args}=Acc) ->
-
-    %% is the key removed from the index?
-    Removed = case Val of
-        {[{<<"_removed">>, true}]} -> true;
-        _ -> false
-    end,
-
-    #changes_args{
-        feed = ResponseType,
-        limit = Limit} = Args,
-
-    %% if the doc sequence is > to the one in the db record, reopen the
-    %% database since it means we don't have the latest db value.
-    Db = case Db0#db.update_seq >= Seq of
-        true -> Db0;
-        false ->
-            {ok, Db1} = couch_db:reopen_db(Db0),
-            Db1
-    end,
-
-    case couch_db:get_doc_info(Db, DocId) of
-        {ok, DocInfo} ->
-            %% get change row
-            {Deleted, ChangeRow} = view_change_row(Db, DocInfo, Args),
-
-            case Removed of
-                true when Deleted /= true ->
-                    %% the key has been removed from the view but the
-                    %% document hasn't been deleted so ignore it.
-                    {ok, Acc};
-                _ ->
-                    %% emit change row
-                    Callback({change, ChangeRow, Prepend}, ResponseType),
-
-                    %% if we achieved the limit, stop here, else continue.
-                    NewLimit = OldLimit + 1,
-                    if Limit > NewLimit ->
-                            {ok, {<<",\n">>, NewLimit, Db, Callback, Args}};
-                        true ->
-                            {stop, {<<"">>, NewLimit, Db, Callback, Args}}
-                    end
-            end;
-        {error, not_found} ->
-            %% doc not found, continue
-            {ok, Acc};
-        Error ->
-            throw(Error)
-    end.
-
-
-view_change_row(Db, DocInfo, Args) ->
-    #doc_info{id = Id, high_seq = Seq, revs = Revs} = DocInfo,
-    [#rev_info{rev=Rev, deleted=Del} | _] = Revs,
-
-    #changes_args{style=Style,
-                  include_docs=InDoc,
-                  doc_options = DocOpts,
-                  conflicts=Conflicts}=Args,
-
-    Changes = case Style of
-        main_only ->
-            [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
-        all_docs ->
-            [{[{<<"rev">>, couch_doc:rev_to_str(R)}]}
-                || #rev_info{rev=R} <- Revs]
-    end,
-
-    {Del, {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Changes}] ++
-     deleted_item(Del) ++ case InDoc of
-            true ->
-                Opts = case Conflicts of
-                    true -> [deleted, conflicts];
-                    false -> [deleted]
-                end,
-                Doc = couch_index_util:load_doc(Db, DocInfo, Opts),
-                case Doc of
-                    null ->
-                        [{doc, null}];
-                    _ ->
-                        [{doc, couch_doc:to_json_obj(Doc, DocOpts)}]
-                end;
-            false ->
-                []
-    end}}.
-
-parse_changes_query(Req, Db, IsViewChanges) ->
+parse_changes_query(Req, Db) ->
     ChangesArgs = lists:foldl(fun({Key, Value}, Args) ->
         case {string:to_lower(Key), Value} of
         {"feed", _} ->
@@ -426,6 +257,3 @@ parse_json(V) when is_list(V) ->
     ?JSON_DECODE(V);
 parse_json(V) ->
     V.
-
-deleted_item(true) -> [{<<"deleted">>, true}];
-deleted_item(_) -> [].