You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/02/10 21:22:48 UTC
[26/50] couch commit: updated refs/heads/import to 09c6556
Enable changes for cluster access
This mostly involves rewriting filter functions so that they aren't
anonymous closures. This breaks clusters when code is upgraded which is
particularly problematic for the _change feed since its a long lived
request.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/e09b8074
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/e09b8074
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/e09b8074
Branch: refs/heads/import
Commit: e09b8074fec59a508905b700c5252df7eb5b5338
Parents: 15b84c0
Author: Paul J. Davis <pa...@gmail.com>
Authored: Mon Mar 11 13:39:42 2013 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Tue Feb 4 17:03:25 2014 -0600
----------------------------------------------------------------------
src/couch_changes.erl | 393 ++++++++++++++++++++++-----------------------
1 file changed, 196 insertions(+), 197 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e09b8074/src/couch_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_changes.erl b/src/couch_changes.erl
index d36b45f..4346109 100644
--- a/src/couch_changes.erl
+++ b/src/couch_changes.erl
@@ -18,11 +18,12 @@
get_changes_timeout/2,
wait_db_updated/3,
get_rest_db_updated/1,
- make_filter_fun/4,
- main_only_filter/1,
- all_docs_filter/1
+ configure_filter/4,
+ filter/3
]).
+-export([changes_enumerator/2]).
+
% For the builtin filter _docs_ids, this is the maximum number
% of documents for which we trigger the optimized code path.
-define(MAX_DOC_IDS, 100).
@@ -51,8 +52,8 @@ handle_changes(Args1, Req, Db0) ->
dir = Dir,
since = Since
} = Args1,
- {FilterFun, FilterArgs} = make_filter_fun(FilterName, Style, Req, Db0),
- Args = Args1#changes_args{filter_fun = FilterFun, filter_args = FilterArgs},
+ Filter = configure_filter(FilterName, Style, Req, Db0),
+ Args = Args1#changes_args{filter_fun = Filter},
Start = fun() ->
{ok, Db} = couch_db:reopen(Db0),
StartSeq = case Dir of
@@ -120,136 +121,153 @@ get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) ->
get_callback_acc(Callback) when is_function(Callback, 2) ->
{fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}.
-make_filter_fun([$_ | _] = FilterName, Style, Req, Db) ->
- builtin_filter_fun(FilterName, Style, Req, Db);
-make_filter_fun(_, main_only, _, _) ->
- fun ?MODULE:main_only_filter/1;
-make_filter_fun(_, all_docs, _, _) ->
- fun ?MODULE:all_docs_filter/1;
-make_filter_fun(FilterName, Style, Req, Db) ->
- {os_filter_fun(FilterName, Style, Req, Db), []}.
-
-os_filter_fun(FilterName, Style, Req, Db) ->
- case [list_to_binary(couch_httpd:unquote(Part))
- || Part <- string:tokens(FilterName, "/")] of
- [] ->
- fun(_Db2, #doc_info{revs=Revs}) ->
- builtin_results(Style, Revs)
- end;
- [DName, FName] ->
- DesignId = <<"_design/", DName/binary>>,
- DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]),
- % validate that the ddoc has the filter fun
- #doc{body={Props}} = DDoc,
- couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
- fun(Db2, DocInfo) ->
- DocInfos =
- case Style of
- main_only ->
- [DocInfo];
- all_docs ->
- [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
- end,
- Docs = [Doc || {ok, Doc} <- [
- couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts])
- || DocInfo2 <- DocInfos]],
- {ok, Passes} = couch_query_servers:filter_docs(
- Req, Db2, DDoc, FName, Docs
- ),
- [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
- || {Pass, #doc{revs={RevPos,[RevId|_]}}}
- <- lists:zip(Passes, Docs), Pass == true]
- end;
- _Else ->
- throw({bad_request,
- "filter parameter must be of the form `designname/filtername`"})
+
+configure_filter("_doc_ids", Style, Req, _Db) ->
+ {doc_ids, Style, get_doc_ids(Req)};
+configure_filter("_design", Style, _Req, _Db) ->
+ {design_docs, Style};
+configure_filter("_view", Style, Req, Db) ->
+ ViewName = couch_httpd:qs_value(Req, "view", ""),
+ if ViewName /= "" -> ok; true ->
+ throw({bad_request, "`view` filter parameter is not provided."})
+ end,
+ ViewNameParts = string:tokens(ViewName, "/"),
+ case [?l2b(couch_httpd:unquote(Part)) || Part <- ViewNameParts] of
+ [DName, VName] ->
+ {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
+ check_member_exists(DDoc, [<<"views">>, VName]),
+ {view, Style, DDoc, VName};
+ [] ->
+ Msg = "`view` must be of the form `designname/viewname`",
+ throw({bad_request, Msg})
+ end;
+configure_filter([$_ | _], _Style, _Req, _Db) ->
+ throw({bad_request, "unknown builtin filter name"});
+configure_filter("", main_only, _Req, _Db) ->
+ {default, main_only};
+configure_filter("", all_docs, _Req, _Db) ->
+ {default, all_docs};
+configure_filter(FilterName, Style, Req, Db) ->
+ FilterNameParts = string:tokens(FilterName, "/"),
+ case [?l2b(couch_httpd:unquote(Part)) || Part <- FilterNameParts] of
+ [DName, FName] ->
+ {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
+ check_member_exists(DDoc, [<<"filters">>, FName]),
+ {custom, Style, Req, DDoc, FName};
+ [] ->
+ {default, Style};
+ _Else ->
+ Msg = "`filter` must be of the form `designname/filtername`",
+ throw({bad_request, Msg})
end.
-builtin_filter_fun("_doc_ids", Style, {json_req, {Props}}, _Db) ->
- DocIds = couch_util:get_value(<<"doc_ids">>, Props),
- {filter_docids(DocIds, Style), DocIds};
-builtin_filter_fun("_doc_ids", Style, #httpd{method='POST'}=Req, _Db) ->
+
+filter(Db, #full_doc_info{}=FDI, Filter) ->
+ filter(Db, couch_doc:to_doc_info(FDI), Filter);
+filter(_Db, DocInfo, {default, Style}) ->
+ apply_style(DocInfo, Style);
+filter(_Db, DocInfo, {doc_ids, Style, DocIds}) ->
+ case lists:member(DocInfo#doc_info.id, DocIds) of
+ true ->
+ apply_style(DocInfo, Style);
+ false ->
+ []
+ end;
+filter(_Db, DocInfo, {design_docs, Style}) ->
+ case DocInfo#doc_info.id of
+ <<"_design", _/binary>> ->
+ apply_style(DocInfo, Style);
+ _ ->
+ []
+ end;
+filter(Db, DocInfo, {view, Style, DDoc, VName}) ->
+ Docs = open_revs(Db, DocInfo, Style),
+ {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs),
+ filter_revs(Passes, Docs);
+filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) ->
+ Req = case Req0 of
+ {json_req, _} -> Req0;
+ #httpd{} -> {json_req, couch_httpd_external:json_req_obj(Req0, Db)}
+ end,
+ Docs = open_revs(Db, DocInfo, Style),
+ {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
+ filter_revs(Passes, Docs).
+
+
+get_doc_ids({json_req, {Props}}) ->
+ check_docids(couch_util:get_value(<<"doc_ids">>, Props));
+get_doc_ids(#httpd{method='POST'}=Req) ->
{Props} = couch_httpd:json_body_obj(Req),
- DocIds = couch_util:get_value(<<"doc_ids">>, Props, nil),
- {filter_docids(DocIds, Style), DocIds};
-builtin_filter_fun("_doc_ids", Style, #httpd{method='GET'}=Req, _Db) ->
+ check_docids(couch_util:get_value(<<"doc_ids">>, Props));
+get_doc_ids(#httpd{method='GET'}=Req) ->
DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")),
- {filter_docids(DocIds, Style), DocIds};
-builtin_filter_fun("_design", Style, _Req, _Db) ->
- {filter_designdoc(Style), []};
-builtin_filter_fun("_view", Style, Req, Db) ->
- ViewName = couch_httpd:qs_value(Req, "view", ""),
- {filter_view(ViewName, Style, Db), []};
-builtin_filter_fun(_FilterName, _Style, _Req, _Db) ->
- throw({bad_request, "unknown builtin filter name"}).
-
-main_only_filter(#doc_info{revs=[#rev_info{rev=Rev}|_]}) ->
- [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}].
-
-all_docs_filter(#doc_info{revs=Revs}) ->
- [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || #rev_info{rev=Rev} <- Revs].
-
-filter_docids(DocIds, Style) when is_list(DocIds)->
- fun(_Db, #doc_info{id=DocId, revs=Revs}) ->
- case lists:member(DocId, DocIds) of
- true ->
- builtin_results(Style, Revs);
- _ -> []
- end
+ check_docids(DocIds);
+get_doc_ids(_) ->
+ throw({bad_request, no_doc_ids_provided}).
+
+
+check_docids(DocIds) when is_list(DocIds) ->
+ lists:foreach(fun
+ (DocId) when not is_binary(DocId) ->
+ Msg = "`doc_ids` filter parameter is not a list of binaries.",
+ throw({bad_request, Msg});
+ (_) -> ok
+ end, DocIds),
+ DocIds;
+check_docids(_) ->
+ Msg = "`doc_ids` filter parameter is not a list of binaries.",
+ throw({bad_request, Msg}).
+
+
+open_ddoc(#db{name= <<"shards/", _/binary>> =ShardName}, DDocId) ->
+ {_, Ref} = spawn_monitor(fun() ->
+ exit(fabric:open_doc(mem3:dbname(ShardName), DDocId, []))
+ end),
+ receive
+ {'DOWN', Ref, _, _, {ok, _}=Response} ->
+ Response;
+ {'DOWN', Ref, _, _, Response} ->
+ throw(Response)
end;
-filter_docids(_, _) ->
- throw({bad_request, "`doc_ids` filter parameter is not a list."}).
-
-filter_designdoc(Style) ->
- fun(_Db, #doc_info{id=DocId, revs=Revs}) ->
- case DocId of
- <<"_design", _/binary>> ->
- builtin_results(Style, Revs);
- _ -> []
- end
+open_ddoc(Db, DDocId) ->
+ case couch_db:open_doc(Db, DDocId, [ejson_body]) of
+ {ok, _} = Resp -> Resp;
+ Else -> throw(Else)
end.
-filter_view("", _Style, _Db) ->
- throw({bad_request, "`view` filter parameter is not provided."});
-filter_view(ViewName, Style, Db) ->
- case [list_to_binary(couch_httpd:unquote(Part))
- || Part <- string:tokens(ViewName, "/")] of
- [] ->
- throw({bad_request, "Invalid `view` parameter."});
- [DName, VName] ->
- DesignId = <<"_design/", DName/binary>>,
- DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]),
- % validate that the ddoc has the filter fun
- #doc{body={Props}} = DDoc,
- couch_util:get_nested_json_value({Props}, [<<"views">>, VName]),
- fun(Db2, DocInfo) ->
- DocInfos =
- case Style of
- main_only ->
- [DocInfo];
- all_docs ->
- [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
- end,
- Docs = [Doc || {ok, Doc} <- [
- couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts])
- || DocInfo2 <- DocInfos]],
- {ok, Passes} = couch_query_servers:filter_view(
- DDoc, VName, Docs
- ),
- [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
- || {Pass, #doc{revs={RevPos,[RevId|_]}}}
- <- lists:zip(Passes, Docs), Pass == true]
- end
- end.
-
-builtin_results(Style, [#rev_info{rev=Rev}|_]=Revs) ->
- case Style of
- main_only ->
- [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
- all_docs ->
- [{[{<<"rev">>, couch_doc:rev_to_str(R)}]}
- || #rev_info{rev=R} <- Revs]
- end.
+
+check_member_exists(#doc{body={Props}}, Path) ->
+ couch_util:get_nested_json_value({Props}, Path).
+
+
+apply_style(#doc_info{revs=Revs}, main_only) ->
+ [#rev_info{rev=Rev} | _] = Revs,
+ [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+apply_style(#doc_info{revs=Revs}, all_docs) ->
+ [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev=R} <- Revs].
+
+
+open_revs(Db, DocInfo, Style) ->
+ DocInfos = case Style of
+ main_only -> [DocInfo];
+ all_docs -> [DocInfo#doc_info{revs=[R]}|| R <- DocInfo#doc_info.revs]
+ end,
+ OpenOpts = [deleted, conflicts],
+ % Relying on list comprehensions to silence errors
+ OpenResults = [couch_db:open_doc(Db, DI, OpenOpts) || DI <- DocInfos],
+ [Doc || {ok, Doc} <- OpenResults].
+
+
+filter_revs(Passes, Docs) ->
+ lists:flatmap(fun
+ ({true, #doc{revs={RevPos, [RevId | _]}}}) ->
+ RevStr = couch_doc:rev_to_str({RevPos, RevId}),
+ Change = {[{<<"rev">>, RevStr}]},
+ [Change];
+ (_) ->
+ []
+ end, lists:zip(Passes, Docs)).
+
get_changes_timeout(Args, Callback) ->
#changes_args{
@@ -292,13 +310,13 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) -
conflicts = Conflicts,
limit = Limit,
feed = ResponseType,
- filter_fun = FilterFun
+ filter_fun = Filter
} = Args,
#changes_acc{
db = Db,
seq = StartSeq,
prepend = Prepend,
- filter = FilterFun,
+ filter = Filter,
callback = Callback,
user_acc = UserAcc,
resp_type = ResponseType,
@@ -311,100 +329,81 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) -
send_changes(Args, Acc0, FirstRound) ->
#changes_args{
- dir = Dir,
- filter = FilterName,
- filter_args = FilterArgs
+ dir = Dir
} = Args,
#changes_acc{
db = Db,
- seq = StartSeq
+ seq = StartSeq,
+ filter = Filter
} = Acc0,
- case FirstRound of
- true ->
- case FilterName of
- "_doc_ids" when length(FilterArgs) =< ?MAX_DOC_IDS ->
- send_changes_doc_ids(
- FilterArgs, Db, StartSeq, Dir, fun changes_enumerator/2, Acc0);
- "_design" ->
- send_changes_design_docs(
- Db, StartSeq, Dir, fun changes_enumerator/2, Acc0);
+ EnumFun = fun ?MODULE:changes_enumerator/2,
+ case can_optimize(FirstRound, Filter) of
+ {true, Fun} ->
+ Fun(Db, StartSeq, Dir, EnumFun, Acc0, Filter);
_ ->
- couch_db:changes_since(
- Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0)
- end;
- false ->
- couch_db:changes_since(
- Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0)
+ couch_db:changes_since(Db, StartSeq, EnumFun, [{dir, Dir}], Acc0)
end.
-send_changes_doc_ids(DocIds, Db, StartSeq, Dir, Fun, Acc0) ->
+can_optimize(true, {doc_ids, _Style, DocIds})
+ when length(DocIds) =< ?MAX_DOC_IDS ->
+ {true, fun send_changes_doc_ids/6};
+can_optimize(true, {design_docs, _Style}) ->
+ {true, fun send_changes_design_docs/6};
+can_optimize(_, _) ->
+ false.
+
+
+send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) ->
Lookups = couch_btree:lookup(Db#db.id_tree, DocIds),
- FullDocInfos = lists:foldl(
- fun({ok, FDI}, Acc) ->
- [FDI | Acc];
- (not_found, Acc) ->
- Acc
- end,
- [], Lookups),
- send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0).
+ FullInfos = lists:foldl(fun
+ ({ok, FDI}, Acc) -> [FDI | Acc];
+ (not_found, Acc) -> Acc
+ end, [], Lookups),
+ send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
-send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0) ->
+send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) ->
FoldFun = fun(FullDocInfo, _, Acc) ->
{ok, [FullDocInfo | Acc]}
end,
KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
- {ok, _, FullDocInfos} = couch_btree:fold(
- Db#db.id_tree, FoldFun, [], KeyOpts),
- send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0).
+ {ok, _, FullInfos} = couch_btree:fold(Db#db.id_tree, FoldFun, [], KeyOpts),
+ send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
FoldFun = case Dir of
- fwd ->
- fun lists:foldl/3;
- rev ->
- fun lists:foldr/3
+ fwd -> fun lists:foldl/3;
+ rev -> fun lists:foldr/3
end,
GreaterFun = case Dir of
- fwd ->
- fun(A, B) -> A > B end;
- rev ->
- fun(A, B) -> A =< B end
+ fwd -> fun(A, B) -> A > B end;
+ rev -> fun(A, B) -> A =< B end
end,
- DocInfos = lists:foldl(
- fun(FDI, Acc) ->
- DI = couch_doc:to_doc_info(FDI),
- case GreaterFun(DI#doc_info.high_seq, StartSeq) of
- true ->
- [DI | Acc];
- false ->
- Acc
- end
- end,
- [], FullDocInfos),
+ DocInfos = lists:foldl(fun(FDI, Acc) ->
+ DI = couch_doc:to_doc_info(FDI),
+ case GreaterFun(DI#doc_info.high_seq, StartSeq) of
+ true -> [DI | Acc];
+ false -> Acc
+ end
+ end, [], FullDocInfos),
SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos),
FinalAcc = try
- FoldFun(
- fun(DocInfo, Acc) ->
- case Fun(DocInfo, Acc) of
+ FoldFun(fun(DocInfo, Acc) ->
+ case Fun(DocInfo, Acc) of
{ok, NewAcc} ->
NewAcc;
{stop, NewAcc} ->
throw({stop, NewAcc})
- end
- end,
- Acc0, SortedDocInfos)
+ end
+ end, Acc0, SortedDocInfos)
catch
- throw:{stop, Acc} ->
- Acc
+ {stop, Acc} -> Acc
end,
case Dir of
- fwd ->
- {ok, FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)}};
- rev ->
- {ok, FinalAcc}
+ fwd -> {ok, FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)}};
+ rev -> {ok, FinalAcc}
end.
@@ -458,12 +457,12 @@ changes_enumerator(DocInfo, #changes_acc{resp_type = ResponseType} = Acc)
when ResponseType =:= "continuous"
orelse ResponseType =:= "eventsource" ->
#changes_acc{
- filter = FilterFun, callback = Callback,
+ filter = Filter, callback = Callback,
user_acc = UserAcc, limit = Limit, db = Db,
timeout = Timeout, timeout_fun = TimeoutFun
} = Acc,
#doc_info{high_seq = Seq} = DocInfo,
- Results0 = FilterFun(Db, DocInfo),
+ Results0 = filter(Db, DocInfo, Filter),
Results = [Result || Result <- Results0, Result /= null],
%% TODO: I'm thinking this should be < 1 and not =< 1
Go = if Limit =< 1 -> stop; true -> ok end,
@@ -484,12 +483,12 @@ changes_enumerator(DocInfo, #changes_acc{resp_type = ResponseType} = Acc)
end;
changes_enumerator(DocInfo, Acc) ->
#changes_acc{
- filter = FilterFun, callback = Callback, prepend = Prepend,
+ filter = Filter, callback = Callback, prepend = Prepend,
user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
timeout = Timeout, timeout_fun = TimeoutFun
} = Acc,
#doc_info{high_seq = Seq} = DocInfo,
- Results0 = FilterFun(Db, DocInfo),
+ Results0 = filter(Db, DocInfo, Filter),
Results = [Result || Result <- Results0, Result /= null],
Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
case Results of