You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2014/02/10 21:22:48 UTC

[26/50] couch commit: updated refs/heads/import to 09c6556

Enable changes for cluster access

This mostly involves rewriting filter functions so that they aren't
anonymous closures. This breaks clusters when code is upgraded which is
particularly problematic for the _change feed since its a long lived
request.


Project: http://git-wip-us.apache.org/repos/asf/couchdb-couch/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-couch/commit/e09b8074
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-couch/tree/e09b8074
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-couch/diff/e09b8074

Branch: refs/heads/import
Commit: e09b8074fec59a508905b700c5252df7eb5b5338
Parents: 15b84c0
Author: Paul J. Davis <pa...@gmail.com>
Authored: Mon Mar 11 13:39:42 2013 -0500
Committer: Paul J. Davis <pa...@gmail.com>
Committed: Tue Feb 4 17:03:25 2014 -0600

----------------------------------------------------------------------
 src/couch_changes.erl | 393 ++++++++++++++++++++++-----------------------
 1 file changed, 196 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-couch/blob/e09b8074/src/couch_changes.erl
----------------------------------------------------------------------
diff --git a/src/couch_changes.erl b/src/couch_changes.erl
index d36b45f..4346109 100644
--- a/src/couch_changes.erl
+++ b/src/couch_changes.erl
@@ -18,11 +18,12 @@
     get_changes_timeout/2,
     wait_db_updated/3,
     get_rest_db_updated/1,
-    make_filter_fun/4,
-    main_only_filter/1,
-    all_docs_filter/1
+    configure_filter/4,
+    filter/3
 ]).
 
+-export([changes_enumerator/2]).
+
 % For the builtin filter _docs_ids, this is the maximum number
 % of documents for which we trigger the optimized code path.
 -define(MAX_DOC_IDS, 100).
@@ -51,8 +52,8 @@ handle_changes(Args1, Req, Db0) ->
         dir = Dir,
         since = Since
     } = Args1,
-    {FilterFun, FilterArgs} = make_filter_fun(FilterName, Style, Req, Db0),
-    Args = Args1#changes_args{filter_fun = FilterFun, filter_args = FilterArgs},
+    Filter = configure_filter(FilterName, Style, Req, Db0),
+    Args = Args1#changes_args{filter_fun = Filter},
     Start = fun() ->
         {ok, Db} = couch_db:reopen(Db0),
         StartSeq = case Dir of
@@ -120,136 +121,153 @@ get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) ->
 get_callback_acc(Callback) when is_function(Callback, 2) ->
     {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}.
 
-make_filter_fun([$_ | _] = FilterName, Style, Req, Db) ->
-    builtin_filter_fun(FilterName, Style, Req, Db);
-make_filter_fun(_, main_only, _, _) ->
-    fun ?MODULE:main_only_filter/1;
-make_filter_fun(_, all_docs, _, _) ->
-    fun ?MODULE:all_docs_filter/1;
-make_filter_fun(FilterName, Style, Req, Db) ->
-    {os_filter_fun(FilterName, Style, Req, Db), []}.
-
-os_filter_fun(FilterName, Style, Req, Db) ->
-    case [list_to_binary(couch_httpd:unquote(Part))
-            || Part <- string:tokens(FilterName, "/")] of
-    [] ->
-        fun(_Db2, #doc_info{revs=Revs}) ->
-                builtin_results(Style, Revs)
-        end;
-    [DName, FName] ->
-        DesignId = <<"_design/", DName/binary>>,
-        DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]),
-        % validate that the ddoc has the filter fun
-        #doc{body={Props}} = DDoc,
-        couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
-        fun(Db2, DocInfo) ->
-            DocInfos =
-            case Style of
-            main_only ->
-                [DocInfo];
-            all_docs ->
-                [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
-            end,
-            Docs = [Doc || {ok, Doc} <- [
-                    couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts])
-                        || DocInfo2 <- DocInfos]],
-            {ok, Passes} = couch_query_servers:filter_docs(
-                Req, Db2, DDoc, FName, Docs
-            ),
-            [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
-                || {Pass, #doc{revs={RevPos,[RevId|_]}}}
-                <- lists:zip(Passes, Docs), Pass == true]
-        end;
-    _Else ->
-        throw({bad_request,
-            "filter parameter must be of the form `designname/filtername`"})
+
+configure_filter("_doc_ids", Style, Req, _Db) ->
+    {doc_ids, Style, get_doc_ids(Req)};
+configure_filter("_design", Style, _Req, _Db) ->
+    {design_docs, Style};
+configure_filter("_view", Style, Req, Db) ->
+    ViewName = couch_httpd:qs_value(Req, "view", ""),
+    if ViewName /= "" -> ok; true ->
+        throw({bad_request, "`view` filter parameter is not provided."})
+    end,
+    ViewNameParts = string:tokens(ViewName, "/"),
+    case [?l2b(couch_httpd:unquote(Part)) || Part <- ViewNameParts] of
+        [DName, VName] ->
+            {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
+            check_member_exists(DDoc, [<<"views">>, VName]),
+            {view, Style, DDoc, VName};
+        [] ->
+            Msg = "`view` must be of the form `designname/viewname`",
+            throw({bad_request, Msg})
+    end;
+configure_filter([$_ | _], _Style, _Req, _Db) ->
+    throw({bad_request, "unknown builtin filter name"});
+configure_filter("", main_only, _Req, _Db) ->
+    {default, main_only};
+configure_filter("", all_docs, _Req, _Db) ->
+    {default, all_docs};
+configure_filter(FilterName, Style, Req, Db) ->
+    FilterNameParts = string:tokens(FilterName, "/"),
+    case [?l2b(couch_httpd:unquote(Part)) || Part <- FilterNameParts] of
+        [DName, FName] ->
+            {ok, DDoc} = open_ddoc(Db, <<"_design/", DName/binary>>),
+            check_member_exists(DDoc, [<<"filters">>, FName]),
+            {custom, Style, Req, DDoc, FName};
+        [] ->
+            {default, Style};
+        _Else ->
+            Msg = "`filter` must be of the form `designname/filtername`",
+            throw({bad_request, Msg})
     end.
 
-builtin_filter_fun("_doc_ids", Style, {json_req, {Props}}, _Db) ->
-    DocIds = couch_util:get_value(<<"doc_ids">>, Props),
-    {filter_docids(DocIds, Style), DocIds};
-builtin_filter_fun("_doc_ids", Style, #httpd{method='POST'}=Req, _Db) ->
+
+filter(Db, #full_doc_info{}=FDI, Filter) ->
+    filter(Db, couch_doc:to_doc_info(FDI), Filter);
+filter(_Db, DocInfo, {default, Style}) ->
+    apply_style(DocInfo, Style);
+filter(_Db, DocInfo, {doc_ids, Style, DocIds}) ->
+    case lists:member(DocInfo#doc_info.id, DocIds) of
+        true ->
+            apply_style(DocInfo, Style);
+        false ->
+            []
+    end;
+filter(_Db, DocInfo, {design_docs, Style}) ->
+    case DocInfo#doc_info.id of
+        <<"_design", _/binary>> ->
+            apply_style(DocInfo, Style);
+        _ ->
+            []
+    end;
+filter(Db, DocInfo, {view, Style, DDoc, VName}) ->
+    Docs = open_revs(Db, DocInfo, Style),
+    {ok, Passes} = couch_query_servers:filter_view(DDoc, VName, Docs),
+    filter_revs(Passes, Docs);
+filter(Db, DocInfo, {custom, Style, Req0, DDoc, FName}) ->
+    Req = case Req0 of
+        {json_req, _} -> Req0;
+        #httpd{} -> {json_req, couch_httpd_external:json_req_obj(Req0, Db)}
+    end,
+    Docs = open_revs(Db, DocInfo, Style),
+    {ok, Passes} = couch_query_servers:filter_docs(Req, Db, DDoc, FName, Docs),
+    filter_revs(Passes, Docs).
+
+
+get_doc_ids({json_req, {Props}}) ->
+    check_docids(couch_util:get_value(<<"doc_ids">>, Props));
+get_doc_ids(#httpd{method='POST'}=Req) ->
     {Props} = couch_httpd:json_body_obj(Req),
-    DocIds =  couch_util:get_value(<<"doc_ids">>, Props, nil),
-    {filter_docids(DocIds, Style), DocIds};
-builtin_filter_fun("_doc_ids", Style, #httpd{method='GET'}=Req, _Db) ->
+    check_docids(couch_util:get_value(<<"doc_ids">>, Props));
+get_doc_ids(#httpd{method='GET'}=Req) ->
     DocIds = ?JSON_DECODE(couch_httpd:qs_value(Req, "doc_ids", "null")),
-    {filter_docids(DocIds, Style), DocIds};
-builtin_filter_fun("_design", Style, _Req, _Db) ->
-    {filter_designdoc(Style), []};
-builtin_filter_fun("_view", Style, Req, Db) ->
-    ViewName = couch_httpd:qs_value(Req, "view", ""),
-    {filter_view(ViewName, Style, Db), []};
-builtin_filter_fun(_FilterName, _Style, _Req, _Db) ->
-    throw({bad_request, "unknown builtin filter name"}).
-
-main_only_filter(#doc_info{revs=[#rev_info{rev=Rev}|_]}) ->
-    [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}].
-
-all_docs_filter(#doc_info{revs=Revs}) ->
-    [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} || #rev_info{rev=Rev} <- Revs].
-
-filter_docids(DocIds, Style) when is_list(DocIds)->
-    fun(_Db, #doc_info{id=DocId, revs=Revs}) ->
-            case lists:member(DocId, DocIds) of
-                true ->
-                    builtin_results(Style, Revs);
-                _ -> []
-            end
+    check_docids(DocIds);
+get_doc_ids(_) ->
+    throw({bad_request, no_doc_ids_provided}).
+
+
+check_docids(DocIds) when is_list(DocIds) ->
+    lists:foreach(fun
+        (DocId) when not is_binary(DocId) ->
+            Msg = "`doc_ids` filter parameter is not a list of binaries.",
+            throw({bad_request, Msg});
+        (_) -> ok
+    end, DocIds),
+    DocIds;
+check_docids(_) ->
+    Msg = "`doc_ids` filter parameter is not a list of binaries.",
+    throw({bad_request, Msg}).
+
+
+open_ddoc(#db{name= <<"shards/", _/binary>> =ShardName}, DDocId) ->
+    {_, Ref} = spawn_monitor(fun() ->
+        exit(fabric:open_doc(mem3:dbname(ShardName), DDocId, []))
+    end),
+    receive
+        {'DOWN', Ref, _, _, {ok, _}=Response} ->
+            Response;
+        {'DOWN', Ref, _, _, Response} ->
+            throw(Response)
     end;
-filter_docids(_, _) ->
-    throw({bad_request, "`doc_ids` filter parameter is not a list."}).
-
-filter_designdoc(Style) ->
-    fun(_Db, #doc_info{id=DocId, revs=Revs}) ->
-            case DocId of
-            <<"_design", _/binary>> ->
-                    builtin_results(Style, Revs);
-                _ -> []
-            end
+open_ddoc(Db, DDocId) ->
+    case couch_db:open_doc(Db, DDocId, [ejson_body]) of
+        {ok, _} = Resp -> Resp;
+        Else -> throw(Else)
     end.
 
-filter_view("", _Style, _Db) ->
-    throw({bad_request, "`view` filter parameter is not provided."});
-filter_view(ViewName, Style, Db) ->
-    case [list_to_binary(couch_httpd:unquote(Part))
-            || Part <- string:tokens(ViewName, "/")] of
-        [] ->
-            throw({bad_request, "Invalid `view` parameter."});
-        [DName, VName] ->
-            DesignId = <<"_design/", DName/binary>>,
-            DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, [ejson_body]),
-            % validate that the ddoc has the filter fun
-            #doc{body={Props}} = DDoc,
-            couch_util:get_nested_json_value({Props}, [<<"views">>, VName]),
-            fun(Db2, DocInfo) ->
-                DocInfos =
-                case Style of
-                main_only ->
-                    [DocInfo];
-                all_docs ->
-                    [DocInfo#doc_info{revs=[Rev]}|| Rev <- DocInfo#doc_info.revs]
-                end,
-                Docs = [Doc || {ok, Doc} <- [
-                        couch_db:open_doc(Db2, DocInfo2, [deleted, conflicts])
-                            || DocInfo2 <- DocInfos]],
-                {ok, Passes} = couch_query_servers:filter_view(
-                    DDoc, VName, Docs
-                ),
-                [{[{<<"rev">>, couch_doc:rev_to_str({RevPos,RevId})}]}
-                    || {Pass, #doc{revs={RevPos,[RevId|_]}}}
-                    <- lists:zip(Passes, Docs), Pass == true]
-            end
-        end.
-
-builtin_results(Style, [#rev_info{rev=Rev}|_]=Revs) ->
-    case Style of
-        main_only ->
-            [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
-        all_docs ->
-            [{[{<<"rev">>, couch_doc:rev_to_str(R)}]}
-                || #rev_info{rev=R} <- Revs]
-    end.
+
+check_member_exists(#doc{body={Props}}, Path) ->
+    couch_util:get_nested_json_value({Props}, Path).
+
+
+apply_style(#doc_info{revs=Revs}, main_only) ->
+    [#rev_info{rev=Rev} | _] = Revs,
+    [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}];
+apply_style(#doc_info{revs=Revs}, all_docs) ->
+    [{[{<<"rev">>, couch_doc:rev_to_str(R)}]} || #rev_info{rev=R} <- Revs].
+
+
+open_revs(Db, DocInfo, Style) ->
+    DocInfos = case Style of
+        main_only -> [DocInfo];
+        all_docs -> [DocInfo#doc_info{revs=[R]}|| R <- DocInfo#doc_info.revs]
+    end,
+    OpenOpts = [deleted, conflicts],
+    % Relying on list comprehensions to silence errors
+    OpenResults = [couch_db:open_doc(Db, DI, OpenOpts) || DI <- DocInfos],
+    [Doc || {ok, Doc} <- OpenResults].
+
+
+filter_revs(Passes, Docs) ->
+    lists:flatmap(fun
+        ({true, #doc{revs={RevPos, [RevId | _]}}}) ->
+            RevStr = couch_doc:rev_to_str({RevPos, RevId}),
+            Change = {[{<<"rev">>, RevStr}]},
+            [Change];
+        (_) ->
+            []
+    end, lists:zip(Passes, Docs)).
+
 
 get_changes_timeout(Args, Callback) ->
     #changes_args{
@@ -292,13 +310,13 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) -
         conflicts = Conflicts,
         limit = Limit,
         feed = ResponseType,
-        filter_fun = FilterFun
+        filter_fun = Filter
     } = Args,
     #changes_acc{
         db = Db,
         seq = StartSeq,
         prepend = Prepend,
-        filter = FilterFun,
+        filter = Filter,
         callback = Callback,
         user_acc = UserAcc,
         resp_type = ResponseType,
@@ -311,100 +329,81 @@ build_acc(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout, TimeoutFun) -
 
 send_changes(Args, Acc0, FirstRound) ->
     #changes_args{
-        dir = Dir,
-        filter = FilterName,
-        filter_args = FilterArgs
+        dir = Dir
     } = Args,
     #changes_acc{
         db = Db,
-        seq = StartSeq
+        seq = StartSeq,
+        filter = Filter
     } = Acc0,
-    case FirstRound of
-    true ->
-        case FilterName of
-        "_doc_ids" when length(FilterArgs) =< ?MAX_DOC_IDS ->
-            send_changes_doc_ids(
-                FilterArgs, Db, StartSeq, Dir, fun changes_enumerator/2, Acc0);
-        "_design" ->
-            send_changes_design_docs(
-                Db, StartSeq, Dir, fun changes_enumerator/2, Acc0);
+    EnumFun = fun ?MODULE:changes_enumerator/2,
+    case can_optimize(FirstRound, Filter) of
+        {true, Fun} ->
+            Fun(Db, StartSeq, Dir, EnumFun, Acc0, Filter);
         _ ->
-            couch_db:changes_since(
-                Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0)
-        end;
-    false ->
-        couch_db:changes_since(
-            Db, StartSeq, fun changes_enumerator/2, [{dir, Dir}], Acc0)
+            couch_db:changes_since(Db, StartSeq, EnumFun, [{dir, Dir}], Acc0)
     end.
 
 
-send_changes_doc_ids(DocIds, Db, StartSeq, Dir, Fun, Acc0) ->
+can_optimize(true, {doc_ids, _Style, DocIds})
+        when length(DocIds) =< ?MAX_DOC_IDS ->
+    {true, fun send_changes_doc_ids/6};
+can_optimize(true, {design_docs, _Style}) ->
+    {true, fun send_changes_design_docs/6};
+can_optimize(_, _) ->
+    false.
+
+
+send_changes_doc_ids(Db, StartSeq, Dir, Fun, Acc0, {doc_ids, _Style, DocIds}) ->
     Lookups = couch_btree:lookup(Db#db.id_tree, DocIds),
-    FullDocInfos = lists:foldl(
-        fun({ok, FDI}, Acc) ->
-            [FDI | Acc];
-        (not_found, Acc) ->
-            Acc
-        end,
-        [], Lookups),
-    send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0).
+    FullInfos = lists:foldl(fun
+        ({ok, FDI}, Acc) -> [FDI | Acc];
+        (not_found, Acc) -> Acc
+    end, [], Lookups),
+    send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
 
 
-send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0) ->
+send_changes_design_docs(Db, StartSeq, Dir, Fun, Acc0, {design_docs, _Style}) ->
     FoldFun = fun(FullDocInfo, _, Acc) ->
         {ok, [FullDocInfo | Acc]}
     end,
     KeyOpts = [{start_key, <<"_design/">>}, {end_key_gt, <<"_design0">>}],
-    {ok, _, FullDocInfos} = couch_btree:fold(
-        Db#db.id_tree, FoldFun, [], KeyOpts),
-    send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0).
+    {ok, _, FullInfos} = couch_btree:fold(Db#db.id_tree, FoldFun, [], KeyOpts),
+    send_lookup_changes(FullInfos, StartSeq, Dir, Db, Fun, Acc0).
 
 
 send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
     FoldFun = case Dir of
-    fwd ->
-        fun lists:foldl/3;
-    rev ->
-        fun lists:foldr/3
+        fwd -> fun lists:foldl/3;
+        rev -> fun lists:foldr/3
     end,
     GreaterFun = case Dir of
-    fwd ->
-        fun(A, B) -> A > B end;
-    rev ->
-        fun(A, B) -> A =< B end
+        fwd -> fun(A, B) -> A > B end;
+        rev -> fun(A, B) -> A =< B end
     end,
-    DocInfos = lists:foldl(
-        fun(FDI, Acc) ->
-            DI = couch_doc:to_doc_info(FDI),
-            case GreaterFun(DI#doc_info.high_seq, StartSeq) of
-            true ->
-                [DI | Acc];
-            false ->
-                Acc
-            end
-        end,
-        [], FullDocInfos),
+    DocInfos = lists:foldl(fun(FDI, Acc) ->
+        DI = couch_doc:to_doc_info(FDI),
+        case GreaterFun(DI#doc_info.high_seq, StartSeq) of
+            true -> [DI | Acc];
+            false -> Acc
+        end
+    end, [], FullDocInfos),
     SortedDocInfos = lists:keysort(#doc_info.high_seq, DocInfos),
     FinalAcc = try
-        FoldFun(
-            fun(DocInfo, Acc) ->
-                case Fun(DocInfo, Acc) of
+        FoldFun(fun(DocInfo, Acc) ->
+            case Fun(DocInfo, Acc) of
                 {ok, NewAcc} ->
                     NewAcc;
                 {stop, NewAcc} ->
                     throw({stop, NewAcc})
-                end
-            end,
-            Acc0, SortedDocInfos)
+            end
+        end, Acc0, SortedDocInfos)
     catch
-    throw:{stop, Acc} ->
-        Acc
+        {stop, Acc} -> Acc
     end,
     case Dir of
-    fwd ->
-        {ok, FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)}};
-    rev ->
-        {ok, FinalAcc}
+        fwd -> {ok, FinalAcc#changes_acc{seq = couch_db:get_update_seq(Db)}};
+        rev -> {ok, FinalAcc}
     end.
 
 
@@ -458,12 +457,12 @@ changes_enumerator(DocInfo, #changes_acc{resp_type = ResponseType} = Acc)
         when ResponseType =:= "continuous"
         orelse ResponseType =:= "eventsource" ->
     #changes_acc{
-        filter = FilterFun, callback = Callback,
+        filter = Filter, callback = Callback,
         user_acc = UserAcc, limit = Limit, db = Db,
         timeout = Timeout, timeout_fun = TimeoutFun
     } = Acc,
     #doc_info{high_seq = Seq} = DocInfo,
-    Results0 = FilterFun(Db, DocInfo),
+    Results0 = filter(Db, DocInfo, Filter),
     Results = [Result || Result <- Results0, Result /= null],
     %% TODO: I'm thinking this should be < 1 and not =< 1
     Go = if Limit =< 1 -> stop; true -> ok end,
@@ -484,12 +483,12 @@ changes_enumerator(DocInfo, #changes_acc{resp_type = ResponseType} = Acc)
     end;
 changes_enumerator(DocInfo, Acc) ->
     #changes_acc{
-        filter = FilterFun, callback = Callback, prepend = Prepend,
+        filter = Filter, callback = Callback, prepend = Prepend,
         user_acc = UserAcc, limit = Limit, resp_type = ResponseType, db = Db,
         timeout = Timeout, timeout_fun = TimeoutFun
     } = Acc,
     #doc_info{high_seq = Seq} = DocInfo,
-    Results0 = FilterFun(Db, DocInfo),
+    Results0 = filter(Db, DocInfo, Filter),
     Results = [Result || Result <- Results0, Result /= null],
     Go = if (Limit =< 1) andalso Results =/= [] -> stop; true -> ok end,
     case Results of