You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ei...@apache.org on 2019/11/22 01:57:56 UTC
[couchdb] 07/14: Remove view_changes parts from couch_changes
This is an automated email from the ASF dual-hosted git repository.
eiri pushed a commit to branch 2167-no-view-changes
in repository https://gitbox.apache.org/repos/asf/couchdb.git
commit 61a9c25c718aecf6f93ef385ecf89466a1244b74
Author: Eric Avdey <ei...@eiri.ca>
AuthorDate: Tue Nov 12 13:45:26 2019 -0400
Remove view_changes parts from couch_changes
---
src/couch/src/couch_changes.erl | 215 ++++------------------------------------
1 file changed, 18 insertions(+), 197 deletions(-)
diff --git a/src/couch/src/couch_changes.erl b/src/couch/src/couch_changes.erl
index c5b5edf..7dc6382 100644
--- a/src/couch/src/couch_changes.erl
+++ b/src/couch/src/couch_changes.erl
@@ -70,40 +70,11 @@ handle_changes(Args1, Req, Db0, Type) ->
} = Args1,
Filter = configure_filter(FilterName, Style, Req, Db0),
Args = Args1#changes_args{filter_fun = Filter},
- % The type of changes feed depends on the supplied filter. If the query is
- % for an optimized view-filtered db changes, we need to use the view
- % sequence tree.
- {UseViewChanges, DDocName, ViewName} = case {Type, Filter} of
- {{view, DDocName0, ViewName0}, _} ->
- {true, DDocName0, ViewName0};
- {_, {fast_view, _, DDoc, ViewName0}} ->
- {true, DDoc#doc.id, ViewName0};
- _ ->
- {false, undefined, undefined}
- end,
DbName = couch_db:name(Db0),
- {StartListenerFun, View} = if UseViewChanges ->
- {ok, {_, View0, _}, _, _} = couch_mrview_util:get_view(
- DbName, DDocName, ViewName, #mrargs{}),
- case View0#mrview.seq_btree of
- #btree{} ->
- ok;
- _ ->
- throw({bad_request, "view changes not enabled"})
- end,
- SNFun = fun() ->
- couch_event:link_listener(
- ?MODULE, handle_view_event, {self(), DDocName}, [{dbname, DbName}]
- )
- end,
- {SNFun, View0};
- true ->
- SNFun = fun() ->
- couch_event:link_listener(
- ?MODULE, handle_db_event, self(), [{dbname, DbName}]
- )
- end,
- {SNFun, undefined}
+ StartListenerFun = fun() ->
+ couch_event:link_listener(
+ ?MODULE, handle_db_event, self(), [{dbname, DbName}]
+ )
end,
Start = fun() ->
{ok, Db} = couch_db:reopen(Db0),
@@ -113,14 +84,7 @@ handle_changes(Args1, Req, Db0, Type) ->
fwd ->
Since
end,
- View2 = if UseViewChanges ->
- {ok, {_, View1, _}, _, _} = couch_mrview_util:get_view(
- DbName, DDocName, ViewName, #mrargs{}),
- View1;
- true ->
- undefined
- end,
- {Db, View2, StartSeq}
+ {Db, StartSeq}
end,
% begin timer to deal with heartbeat when filter function fails
case Args#changes_args.heartbeat of
@@ -136,12 +100,11 @@ handle_changes(Args1, Req, Db0, Type) ->
{Callback, UserAcc} = get_callback_acc(CallbackAcc),
{ok, Listener} = StartListenerFun(),
- {Db, View, StartSeq} = Start(),
+ {Db, StartSeq} = Start(),
UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
{Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
Acc0 = build_acc(Args, Callback, UserAcc2, Db, StartSeq,
- <<"">>, Timeout, TimeoutFun, DDocName, ViewName,
- View),
+ <<"">>, Timeout, TimeoutFun),
try
keep_sending_changes(
Args#changes_args{dir=fwd},
@@ -157,10 +120,10 @@ handle_changes(Args1, Req, Db0, Type) ->
{Callback, UserAcc} = get_callback_acc(CallbackAcc),
UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
{Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
- {Db, View, StartSeq} = Start(),
+ {Db, StartSeq} = Start(),
Acc0 = build_acc(Args#changes_args{feed="normal"}, Callback,
- UserAcc2, Db, StartSeq, <<>>, Timeout, TimeoutFun,
- DDocName, ViewName, View),
+ UserAcc2, Db, StartSeq, <<>>,
+ Timeout, TimeoutFun),
{ok, #changes_acc{seq = LastSeq, user_acc = UserAcc3}} =
send_changes(
Acc0,
@@ -214,21 +177,12 @@ configure_filter("_view", Style, Req, Db) ->
[DName, VName] ->
{ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
check_member_exists(DDoc, [<<"views">>, VName]),
- FilterType = try
- true = couch_util:get_nested_json_value(
- DDoc#doc.body,
- [<<"options">>, <<"seq_indexed">>]
- ),
- fast_view
- catch _:_ ->
- view
- end,
case couch_db:is_clustered(Db) of
true ->
DIR = fabric_util:doc_id_and_rev(DDoc),
- {fetch, FilterType, Style, DIR, VName};
+ {fetch, view, Style, DIR, VName};
false ->
- {FilterType, Style, DDoc, VName}
+ {view, Style, DDoc, VName}
end;
[] ->
Msg = "`view` must be of the form `designname/viewname`",
@@ -285,8 +239,7 @@ filter(_Db, DocInfo, {design_docs, Style}) ->
_ ->
[]
end;
-filter(Db, DocInfo, {FilterType, Style, DDoc, VName})
- when FilterType == view; FilterType == fast_view ->
+filter(Db, DocInfo, {view, Style, DDoc, VName}) ->
Docs = open_revs(Db, DocInfo, Style),
{ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs),
filter_revs(Passes, Docs);
@@ -493,7 +446,7 @@ start_sending_changes(_Callback, UserAcc, ResponseType)
start_sending_changes(Callback, UserAcc, ResponseType) ->
Callback(start, ResponseType, UserAcc).
-build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, DDocName, ViewName, View) ->
+build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) ->
#changes_args{
include_docs = IncludeDocs,
doc_options = DocOpts,
@@ -516,9 +469,6 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun, D
conflicts = Conflicts,
timeout = Timeout,
timeout_fun = TimeoutFun,
- ddoc_name = DDocName,
- view_name = ViewName,
- view = View,
aggregation_results=[],
aggregation_kvs=[]
}.
@@ -527,41 +477,15 @@ send_changes(Acc, Dir, FirstRound) ->
#changes_acc{
db = Db,
seq = StartSeq,
- filter = Filter,
- view = View
+ filter = Filter
} = Acc,
DbEnumFun = fun changes_enumerator/2,
case can_optimize(FirstRound, Filter) of
{true, Fun} ->
Fun(Db, StartSeq, Dir, DbEnumFun, Acc, Filter);
_ ->
- case {View, Filter} of
- {#mrview{}, {fast_view, _, _, _}} ->
- couch_mrview:view_changes_since(View, StartSeq, DbEnumFun, [{dir, Dir}], Acc);
- {undefined, _} ->
- Opts = [{dir, Dir}],
- couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts);
- {#mrview{}, _} ->
- ViewEnumFun = fun view_changes_enumerator/2,
- {Go, Acc0} = couch_mrview:view_changes_since(View, StartSeq, ViewEnumFun, [{dir, Dir}], Acc),
- case Acc0 of
- #changes_acc{aggregation_results=[]} ->
- {Go, Acc0};
- _ ->
- #changes_acc{
- aggregation_results = AggResults,
- aggregation_kvs = AggKVs,
- user_acc = UserAcc,
- callback = Callback,
- resp_type = ResponseType,
- prepend = Prepend
- } = Acc0,
- ChangesRow = view_changes_row(AggResults, AggKVs, Acc0),
- UserAcc0 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
- reset_heartbeat(),
- {Go, Acc0#changes_acc{user_acc=UserAcc0}}
- end
- end
+ Opts = [{dir, Dir}],
+ couch_db:fold_changes(Db, StartSeq, DbEnumFun, Acc, Opts)
end.
@@ -653,8 +577,7 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
#changes_acc{
db = Db, callback = Callback,
timeout = Timeout, timeout_fun = TimeoutFun, seq = EndSeq,
- prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit,
- ddoc_name = DDocName, view_name = ViewName
+ prepend = Prepend2, user_acc = UserAcc2, limit = NewLimit
} = ChangesAcc,
couch_db:close(Db),
@@ -670,7 +593,6 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
Args#changes_args{limit=NewLimit},
ChangesAcc#changes_acc{
db = Db2,
- view = maybe_refresh_view(Db2, DDocName, ViewName),
user_acc = UserAcc4,
seq = EndSeq,
prepend = Prepend2,
@@ -685,84 +607,9 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
end
end.
-maybe_refresh_view(_, undefined, undefined) ->
- undefined;
-maybe_refresh_view(Db, DDocName, ViewName) ->
- DbName = couch_db:name(Db),
- {ok, {_, View, _}, _, _} = couch_mrview_util:get_view(DbName, DDocName, ViewName, #mrargs{}),
- View.
-
end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
Callback({stop, EndSeq}, ResponseType, UserAcc).
-view_changes_enumerator(Value, Acc) ->
- #changes_acc{
- filter = Filter, callback = Callback, prepend = Prepend,
- user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
- timeout = Timeout, timeout_fun = TimeoutFun, seq = CurrentSeq,
- aggregation_kvs=AggKVs, aggregation_results=AggResults
- } = Acc,
-
- Results0 = view_filter(Db, Value, Filter),
- Results = [Result || Result <- Results0, Result /= null],
- {{Seq, _}, _} = Value,
-
- Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
-
- if CurrentSeq =:= Seq ->
- NewAggKVs = case Results of
- [] -> AggKVs;
- _ -> [Value|AggKVs]
- end,
- {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
- Acc0 = Acc#changes_acc{
- seq = Seq,
- user_acc = UserAcc2,
- aggregation_kvs=NewAggKVs
- },
- case Done of
- stop -> {stop, Acc0};
- ok -> {Go, Acc0}
- end;
- AggResults =/= [] ->
- {NewAggKVs, NewAggResults} = case Results of
- [] -> {[], []};
- _ -> {[Value], Results}
- end,
- if ResponseType =:= "continuous" orelse ResponseType =:= "eventsource" ->
- ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
- UserAcc2 = Callback({change, ChangesRow, <<>>}, ResponseType, UserAcc),
- reset_heartbeat(),
- {Go, Acc#changes_acc{
- seq = Seq, user_acc = UserAcc2, limit = Limit - 1,
- aggregation_kvs=NewAggKVs, aggregation_results=NewAggResults}};
- true ->
- ChangesRow = view_changes_row(AggResults, AggKVs, Acc),
- UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
- reset_heartbeat(),
- {Go, Acc#changes_acc{
- seq = Seq, prepend = <<",\n">>, user_acc = UserAcc2,
- limit = Limit - 1, aggregation_kvs=[Value],
- aggregation_results=Results}}
- end;
- true ->
- {NewAggKVs, NewAggResults} = case Results of
- [] -> {[], []};
- _ -> {[Value], Results}
- end,
- {Done, UserAcc2} = maybe_heartbeat(Timeout, TimeoutFun, UserAcc),
- Acc0 = Acc#changes_acc{
- seq = Seq,
- user_acc = UserAcc2,
- aggregation_kvs=NewAggKVs,
- aggregation_results=NewAggResults
- },
- case Done of
- stop -> {stop, Acc0};
- ok -> {Go, Acc0}
- end
- end.
-
changes_enumerator(Value0, Acc) ->
#changes_acc{
filter = Filter, callback = Callback, prepend = Prepend,
@@ -812,32 +659,6 @@ changes_enumerator(Value0, Acc) ->
-view_changes_row(Results, KVs, Acc) ->
- {Add, Remove} = lists:foldl(fun(Row, {AddAcc, RemAcc}) ->
- {{_Seq, Key}, {_Id, Value, _Rev}} = Row,
- case Value of
- removed ->
- {AddAcc, [Key|RemAcc]};
- {dups, DupValues} ->
- AddAcc1 = lists:foldl(fun(DupValue, AddAcc0) ->
- [[Key, DupValue]|AddAcc0]
- end, AddAcc, DupValues),
- {AddAcc1, RemAcc};
- _ ->
- {[[Key, Value]|AddAcc], RemAcc}
- end
- end, {[], []}, KVs),
-
- % Seq, Id, and Rev should be the same for all KVs, since we're aggregating
- % by seq.
- [{{Seq, _Key}, {Id, _Value, Rev}}|_] = KVs,
-
- {[
- {<<"seq">>, Seq}, {<<"id">>, Id}, {<<"add">>, Add},
- {<<"remove">>, Remove}, {<<"changes">>, Results}
- ] ++ maybe_get_changes_doc({Id, Rev}, Acc)}.
-
-
changes_row(Results, #full_doc_info{} = FDI, Acc) ->
changes_row(Results, couch_doc:to_doc_info(FDI), Acc);
changes_row(Results, DocInfo, Acc) ->