You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by jc...@apache.org on 2010/02/26 02:16:32 UTC

svn commit: r916521 - /couchdb/trunk/src/couchdb/couch_changes.erl

Author: jchris
Date: Fri Feb 26 01:16:32 2010
New Revision: 916521

URL: http://svn.apache.org/viewvc?rev=916521&view=rev
Log:
I will not forget to run git add, I will not forget to run git add

Added:
    couchdb/trunk/src/couchdb/couch_changes.erl

Added: couchdb/trunk/src/couchdb/couch_changes.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_changes.erl?rev=916521&view=auto
==============================================================================
--- couchdb/trunk/src/couchdb/couch_changes.erl (added)
+++ couchdb/trunk/src/couchdb/couch_changes.erl Fri Feb 26 01:16:32 2010
@@ -0,0 +1,257 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_changes).
+-include("couch_db.hrl").
+
+-export([handle_changes/3]).
+
+%% @type Req -> #httpd{} | {json_req, JsonObj()}
+handle_changes(#changes_args{}=Args1, Req, Db) ->
+    Args = Args1#changes_args{filter=make_filter_fun(Args1, Req, Db)},
+    StartSeq = case Args#changes_args.dir of
+    rev ->
+        couch_db:get_update_seq(Db);
+    fwd ->
+        Args#changes_args.since
+    end,
+    if Args#changes_args.feed == "continuous" orelse
+        Args#changes_args.feed == "longpoll" ->
+        fun(Callback) ->
+            start_sending_changes(Callback, Args#changes_args.feed),
+            Self = self(),
+            {ok, Notify} = couch_db_update_notifier:start_link(
+                fun({_, DbName}) when DbName == Db#db.name ->
+                    Self ! db_updated;
+                (_) ->
+                    ok
+                end
+            ),
+            {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
+            couch_stats_collector:track_process_count(
+                Self,
+                {httpd, clients_requesting_changes}
+            ),
+            try
+                keep_sending_changes(
+                    Args,
+                    Callback,
+                    Db,
+                    StartSeq,
+                    <<"">>,
+                    Timeout,
+                    TimeoutFun
+                )
+            after
+                couch_db_update_notifier:stop(Notify),
+                get_rest_db_updated() % clean out any remaining update messages
+            end
+        end;
+    true ->
+        fun(Callback) ->
+            start_sending_changes(Callback, Args#changes_args.feed),
+            {ok, {_, LastSeq, _Prepend, _, _, _, _, _}} =
+                send_changes(
+                    Args#changes_args{feed="normal"},
+                    Callback,
+                    Db,
+                    StartSeq,
+                    <<"">>
+                ),
+            end_sending_changes(Callback, LastSeq, Args#changes_args.feed)
+        end
+    end.
+
+%% @type Req -> #httpd{} | {json_req, JsonObj()}
+make_filter_fun(#changes_args{filter=FilterName}, Req, Db) ->
+    case [list_to_binary(couch_httpd:unquote(Part))
+            || Part <- string:tokens(FilterName, "/")] of
+    [] ->
+        fun(DocInfos) ->
+        % doing this as a batch is more efficient for external filters
+            [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]} ||
+                #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos]
+        end;
+    [DName, FName] ->
+        DesignId = <<"_design/", DName/binary>>,
+        DDoc = couch_httpd_db:couch_doc_open(Db, DesignId, nil, []),
+        % validate that the ddoc has the filter fun
+        #doc{body={Props}} = DDoc,
+        couch_util:get_nested_json_value({Props}, [<<"filters">>, FName]),
+        fun(DocInfos) ->
+            Docs = [Doc || {ok, Doc} <- [
+                {ok, _Doc} = couch_db:open_doc(Db, DInfo, [deleted, conflicts])
+                || DInfo <- DocInfos]],
+            {ok, Passes} = couch_query_servers:filter_docs(
+                Req, Db, DDoc, FName, Docs
+            ),
+            [{[{<<"rev">>, couch_doc:rev_to_str(Rev)}]}
+                || #doc_info{revs=[#rev_info{rev=Rev}|_]} <- DocInfos,
+                Pass <- Passes, Pass == true]
+        end;
+    _Else ->
+        throw({bad_request,
+            "filter parameter must be of the form `designname/filtername`"})
+    end.
+
+get_changes_timeout(Args, Callback) ->
+    #changes_args{
+        heartbeat = Heartbeat,
+        timeout = Timeout,
+        feed = ResponseType
+    } = Args,
+    DefaultTimeout = list_to_integer(
+        couch_config:get("httpd", "changes_timeout", "60000")
+    ),
+    case Heartbeat of
+    undefined ->
+        case Timeout of
+        undefined ->
+            {DefaultTimeout, fun() -> stop end};
+        _ ->
+            {lists:min([DefaultTimeout, Timeout]), fun() -> stop end}
+        end;
+    true ->
+        {DefaultTimeout, fun() -> Callback(timeout, ResponseType), ok end};
+    _ ->
+        {lists:min([DefaultTimeout, Heartbeat]),
+            fun() -> Callback(timeout, ResponseType), ok end}
+    end.
+
+start_sending_changes(_Callback, "continuous") ->
+    ok;
+start_sending_changes(Callback, ResponseType) ->
+    Callback(start, ResponseType).
+
+send_changes(Args, Callback, Db, StartSeq, Prepend) ->
+    #changes_args{
+        style = Style,
+        include_docs = IncludeDocs,
+        limit = Limit,
+        feed = ResponseType,
+        dir = Dir,
+        filter = FilterFun
+    } = Args,
+    couch_db:changes_since(
+        Db,
+        Style,
+        StartSeq,
+        fun changes_enumerator/2,
+        [{dir, Dir}],
+        {Db, StartSeq, Prepend, FilterFun, Callback, ResponseType, Limit,
+            IncludeDocs}
+    ).
+
+keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout,
+    TimeoutFun) ->
+
+    #changes_args{
+        feed = ResponseType,
+        limit = Limit
+    } = Args,
+    {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes(
+        Args#changes_args{dir=fwd}, Callback, Db, StartSeq, Prepend
+    ),
+    couch_db:close(Db),
+    if Limit > NewLimit, ResponseType == "longpoll" ->
+        end_sending_changes(Callback, EndSeq, ResponseType);
+    true ->
+        case wait_db_updated(Timeout, TimeoutFun) of
+        updated ->
+            case couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]) of
+            {ok, Db2} ->
+                keep_sending_changes(
+                    Args#changes_args{limit=NewLimit},
+                    Callback,
+                    Db2,
+                    EndSeq,
+                    Prepend2,
+                    Timeout,
+                    TimeoutFun
+                );
+            _Else ->
+                end_sending_changes(Callback, EndSeq, ResponseType)
+            end;
+        stop ->
+            end_sending_changes(Callback, EndSeq, ResponseType)
+        end
+    end.
+
+end_sending_changes(Callback, EndSeq, ResponseType) ->
+    Callback({stop, EndSeq}, ResponseType).
+
+changes_enumerator(DocInfos, {Db, _, _, FilterFun, Callback, "continuous",
+    Limit, IncludeDocs}) ->
+
+    [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_]
+        = DocInfos,
+    Results0 = FilterFun(DocInfos),
+    Results = [Result || Result <- Results0, Result /= null],
+    Go = if Limit =< 1 -> stop; true -> ok end,
+    case Results of
+    [] ->
+        {Go, {Db, Seq, nil, FilterFun, Callback, "continuous", Limit,
+                IncludeDocs}
+        };
+    _ ->
+        ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs),
+        Callback({change, ChangesRow, <<"">>}, "continuous"),
+        {Go, {Db, Seq, nil, FilterFun, Callback, "continuous",  Limit - 1,
+                IncludeDocs}
+        }
+    end;
+changes_enumerator(DocInfos, {Db, _, Prepend, FilterFun, Callback, ResponseType,
+    Limit, IncludeDocs}) ->
+
+    [#doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}|_]
+        = DocInfos,
+    Results0 = FilterFun(DocInfos),
+    Results = [Result || Result <- Results0, Result /= null],
+    Go = if Limit =< 1 -> stop; true -> ok end,
+    case Results of
+    [] ->
+        {Go, {Db, Seq, Prepend, FilterFun, Callback, ResponseType, Limit,
+                IncludeDocs}
+        };
+    _ ->
+        ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs),
+        Callback({change, ChangesRow, Prepend}, ResponseType),
+        {Go, {Db, Seq, <<",\n">>, FilterFun, Callback, ResponseType, Limit - 1,
+                IncludeDocs}
+        }
+    end.
+
+
+changes_row(Db, Seq, Id, Del, Results, Rev, true) ->
+    {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++
+        deleted_item(Del) ++ couch_httpd_view:doc_member(Db, {Id, Rev})};
+changes_row(_, Seq, Id, Del, Results, _, false) ->
+    {[{<<"seq">>, Seq}, {<<"id">>, Id}, {<<"changes">>, Results}] ++
+        deleted_item(Del)}.
+
+deleted_item(true) -> [{deleted, true}];
+deleted_item(_) -> [].
+
+% waits for a db_updated msg, if there are multiple msgs, collects them.
+wait_db_updated(Timeout, TimeoutFun) ->
+    receive db_updated -> get_rest_db_updated()
+    after Timeout ->
+        case TimeoutFun() of
+        ok -> wait_db_updated(Timeout, TimeoutFun);
+        stop -> stop
+        end
+    end.
+
+get_rest_db_updated() ->
+    receive db_updated -> get_rest_db_updated()
+    after 0 -> updated
+    end.