You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ei...@apache.org on 2019/11/22 01:57:56 UTC

[couchdb] 07/14: Remove view_changes parts from couch_changes

This is an automated email from the ASF dual-hosted git repository.

eiri pushed a commit to branch 2167-no-view-changes
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 61a9c25c718aecf6f93ef385ecf89466a1244b74
Author: Eric Avdey <ei...@eiri.ca>
AuthorDate: Tue Nov 12 13:45:26 2019 -0400

    Remove view_changes parts from couch_changes
---
 src/couch/src/couch_changes.erl | 215 ++++------------------------------------
 1 file changed, 18 insertions(+), 197 deletions(-)

diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl
index c5b5edf..7dc6382 100644
--- a/src/couch/src/couch_changes.erl
+++ b/src/couch/src/couch_changes.erl
@@ -70,40 +70,11 @@ handle_changes(Args1, Req, Db0, Type) ->
     } = Args1,
     Filter = configure_filter(FilterName, Style, Req, Db0),
     Args = Args1#changes_args{filter_fun = Filter},
-    % The type of changes feed depends on the supplied filter. If the query is
-    % for an optimized view-filtered db changes, we need to use the view
-    % sequence tree.
-    {UseViewChanges, DDocName, ViewName} = case {Type, Filter} of
-        {{view, DDocName0, ViewName0}, _} ->
-            {true, DDocName0, ViewName0};
-        {_, {fast_view, _, DDoc, ViewName0}} ->
-            {true, DDoc#doc.id, ViewName0};
-        _ ->
-            {false, undefined, undefined}
-    end,
     DbName = couch_db:name(Db0),
-    {StartListenerFun, View} = if UseViewChanges ->
-        {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view(
-                DbName, DDocName, ViewName, #mrargs{}),
-        case View0#mrview.seq_btree of
-            #btree{} ->
-                ok;
-            _ ->
-                throw({bad_request, "view changes not enabled"})
-        end,
-        SNFun = fun() ->
-            couch_event:link_listener(
-                 ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, DbName}]
-            )
-        end,
-        {SNFun, View0};
-    true ->
-        SNFun = fun() ->
-            couch_event:link_listener(
-                 ?MODULE, handle_db_event, self(), [{dbname, DbName}]
-            )
-        end,
-        {SNFun, undefined}
+    StartListenerFun = fun() ->
+        couch_event:link_listener(
+            ?MODULE, handle_db_event, self(), [{dbname, DbName}]
+        )
     end,
     Start = fun() ->
         {ok, Db} = couch_db:reopen(Db0),
@@ -113,14 +84,7 @@ handle_changes(Args1, Req, Db0, Type) ->
         fwd ->
             Since
         end,
-        View2 = if UseViewChanges ->
-            {ok, {_, View1, _}, _, _} = couch_mrview_util:get_view(
-                    DbName, DDocName, ViewName, #mrargs{}),
-            View1;
-        true ->
-            undefined
-        end,
-        {Db, View2, StartSeq}
+        {Db, StartSeq}
     end,
     % begin timer to deal with heartbeat when filter function fails
     case Args#changes_args.heartbeat of
@@ -136,12 +100,11 @@ handle_changes(Args1, Req, Db0, Type) ->
             {Callback, UserAcc} = get_callback_acc(CallbackAcc),
             {ok, Listener} = StartListenerFun(),
 
-            {Db, View, StartSeq} = Start(),
+            {Db, StartSeq} = Start(),
             UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
             {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
             Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq,
-                             <<"">>, Timeout, TimeoutFun, DDocName, ViewName,
-                             View),
+                             <<"">>, Timeout, TimeoutFun),
             try
                 keep_sending_changes(
                     Args#changes_args{dir=fwd},
@@ -157,10 +120,10 @@ handle_changes(Args1, Req, Db0, Type) ->
             {Callback, UserAcc} = get_callback_acc(CallbackAcc),
             UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
             {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
-            {Db, View, StartSeq} = Start(),
+            {Db, StartSeq} = Start(),
             Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback,
-                             UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun,
-                             DDocName, ViewName, View),
+                             UserAcc2, Db, StartSeq, <<>>,
+                             Timeout, TimeoutFun),
             {ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} =
                 send_changes(
                     Acc0,
@@ -214,21 +177,12 @@ configure_filter("_view", Style, Req, Db) ->
         [DName, VName] ->
             {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
             check_member_exists(DDoc, [<<"views">>, VName]),
-            FilterType = try
-                true = couch_util:get_nested_json_value(
-                        DDoc#doc.body,
-                        [<<"options">>, <<"seq_indexed">>]
-                ),
-                fast_view
-            catch _:_ ->
-                view
-            end,
             case couch_db:is_clustered(Db) of
                 true ->
                     DIR = fabric_util:doc_id_and_rev(DDoc),
-                    {fetch, FilterType, Style, DIR, VName};
+                    {fetch, view, Style, DIR, VName};
                 false ->
-                    {FilterType, Style, DDoc, VName}
+                    {view, Style, DDoc, VName}
             end;
         [] ->
             Msg = "`view` must be of the form `designname/viewname`",
@@ -285,8 +239,7 @@ filter(_Db, DocInfo, {design_docs, Style}) ->
         _ ->
             []
     end;
-filter(Db, DocInfo, {FilterType, Style, DDoc, VName})
-        when FilterType == view; FilterType == fast_view ->
+filter(Db, DocInfo, {view, Style, DDoc, VName}) ->
     Docs = open_revs(Db, DocInfo, Style),
     {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs),
     filter_revs(Passes, Docs);
@@ -493,7 +446,7 @@ start_sending_changes(_Callback, UserAcc, ResponseType)
 start_sending_changes(Callback, UserAcc, ResponseType) ->
     Callback(start, ResponseType, UserAcc).
 
-build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, DDocName, ViewName, View) ->
+build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) ->
     #changes_args{
         include_docs = IncludeDocs,
         doc_options = DocOpts,
@@ -516,9 +469,6 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, D
         conflicts = Conflicts,
         timeout = Timeout,
         timeout_fun = TimeoutFun,
-        ddoc_name = DDocName,
-        view_name = ViewName,
-        view = View,
         aggregation_results=[],
         aggregation_kvs=[]
     }.
@@ -527,41 +477,15 @@ send_changes(Acc, Dir, FirstRound) ->
     #changes_acc{
         db = Db,
         seq = StartSeq,
-        filter = Filter,
-        view = View
+        filter = Filter
     } = Acc,
     DbEnumFun = fun changes_enumerator/2,
     case can_optimize(FirstRound, Filter) of
         {true, Fun} ->
             Fun(Db, StartSeq, Dir, DbEnumFun, Acc, Filter);
         _ ->
-            case {View, Filter}  of
-                {#mrview{}, {fast_view, _, _, _}} ->
-                    couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
-                {undefined, _} ->
-                    Opts = [{dir, Dir}],
-                    couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts);
-                {#mrview{}, _} ->
-                    ViewEnumFun = fun view_changes_enumerator/2,
-                    {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc),
-                    case Acc0 of
-                        #changes_acc{aggregation_results=[]} ->
-                            {Go, Acc0};
-                        _ ->
-                            #changes_acc{
-                                aggregation_results = AggResults,
-                                aggregation_kvs = AggKVs,
-                                user_acc = UserAcc,
-                                callback = Callback,
-                                resp_type = ResponseType,
-                                prepend = Prepend
-                            } = Acc0,
-                            ChangesRow = view_changes_row(AggResults, AggKVs, Acc0),
-                            UserAcc0 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
-                            reset_heartbeat(),
-                            {Go, Acc0#changes_acc{user_acc=UserAcc0}}
-                    end
-            end
+            Opts = [{dir, Dir}],
+            couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts)
     end.
 
 
@@ -653,8 +577,7 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
     #changes_acc{
         db = Db, callback = Callback,
         timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq,
-        prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit,
-        ddoc_name = DDocName, view_name = ViewName
+        prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit
     } = ChangesAcc,
 
     couch_db:close(Db),
@@ -670,7 +593,6 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
                   Args#changes_args{limit=NewLimit},
                   ChangesAcc#changes_acc{
                     db = Db2,
-                    view = maybe_refresh_view(Db2, DDocName, ViewName),
                     user_acc = UserAcc4,
                     seq = EndSeq,
                     prepend = Prepend2,
@@ -685,84 +607,9 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
         end
     end.
 
-maybe_refresh_view(_, undefined, undefined) ->
-    undefined;
-maybe_refresh_view(Db, DDocName, ViewName) ->
-    DbName = couch_db:name(Db),
-    {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(DbName, DDocName, ViewName, #mrargs{}),
-    View.
-
 end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
     Callback({stop, EndSeq}, ResponseType, UserAcc).
 
-view_changes_enumerator(Value, Acc) ->
-    #changes_acc{
-        filter = Filter, callback = Callback, prepend = Prepend,
-        user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
-        timeout = Timeout, timeout_fun = TimeoutFun, seq = CurrentSeq,
-        aggregation_kvs=AggKVs, aggregation_results=AggResults
-    } = Acc,
-
-    Results0 = view_filter(Db, Value, Filter),
-    Results = [Result || Result <- Results0, Result /= null],
-    {{Seq, _}, _} = Value,
-
-    Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
-
-    if CurrentSeq =:= Seq ->
-        NewAggKVs = case Results of
-            [] -> AggKVs;
-            _ -> [Value|AggKVs]
-        end,
-        {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
-        Acc0 = Acc#changes_acc{
-            seq = Seq,
-            user_acc = UserAcc2,
-            aggregation_kvs=NewAggKVs
-        },
-        case Done of
-            stop -> {stop, Acc0};
-            ok -> {Go, Acc0}
-        end;
-    AggResults =/= [] ->
-        {NewAggKVs, NewAggResults} = case Results of
-            [] -> {[], []};
-            _ -> {[Value], Results}
-        end,
-        if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
-            ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
-            UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
-            reset_heartbeat(),
-            {Go, Acc#changes_acc{
-                seq = Seq, user_acc = UserAcc2, limit = Limit - 1,
-                aggregation_kvs=NewAggKVs, aggregation_results=NewAggResults}};
-        true ->
-            ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
-            UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
-            reset_heartbeat(),
-            {Go, Acc#changes_acc{
-                seq = Seq, prepend = <<",\n">>, user_acc = UserAcc2,
-                limit = Limit - 1, aggregation_kvs=[Value],
-                aggregation_results=Results}}
-        end;
-    true ->
-        {NewAggKVs, NewAggResults} = case Results of
-            [] -> {[], []};
-            _ -> {[Value], Results}
-        end,
-        {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
-        Acc0 = Acc#changes_acc{
-            seq = Seq,
-            user_acc = UserAcc2,
-            aggregation_kvs=NewAggKVs,
-            aggregation_results=NewAggResults
-        },
-        case Done of
-            stop -> {stop, Acc0};
-            ok -> {Go, Acc0}
-        end
-    end.
-
 changes_enumerator(Value0, Acc) ->
     #changes_acc{
         filter = Filter, callback = Callback, prepend = Prepend,
@@ -812,32 +659,6 @@ changes_enumerator(Value0, Acc) ->
 
 
 
-view_changes_row(Results, KVs, Acc) ->
-    {Add, Remove} = lists:foldl(fun(Row, {AddAcc, RemAcc}) ->
-        {{_Seq, Key}, {_Id, Value, _Rev}} = Row,
-        case Value of
-            removed ->
-                {AddAcc, [Key|RemAcc]};
-            {dups, DupValues} ->
-                AddAcc1 = lists:foldl(fun(DupValue, AddAcc0) ->
-                    [[Key, DupValue]|AddAcc0]
-                end, AddAcc, DupValues),
-                {AddAcc1, RemAcc};
-            _ ->
-                {[[Key, Value]|AddAcc], RemAcc}
-        end
-    end, {[], []}, KVs),
-
-    % Seq, Id, and Rev should be the same for all KVs, since we're aggregating
-    % by seq.
-    [{{Seq, _Key}, {Id, _Value, Rev}}|_] = KVs,
-
-    {[
-        {<<"seq">>, Seq}, {<<"id">>, Id}, {<<"add">>, Add},
-        {<<"remove">>, Remove}, {<<"changes">>, Results}
-    ] ++ maybe_get_changes_doc({Id, Rev}, Acc)}.
-
-
 changes_row(Results, #full_doc_info{} = FDI, Acc) ->
     changes_row(Results, couch_doc:to_doc_info(FDI), Acc);
 changes_row(Results, DocInfo, Acc) ->