You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by fd...@apache.org on 2010/09/09 20:22:23 UTC

svn commit: r995529 - /couchdb/branches/new_replicator/src/couchdb/couch_changes.erl

Author: fdmanana
Date: Thu Sep  9 18:22:22 2010
New Revision: 995529

URL: http://svn.apache.org/viewvc?rev=995529&view=rev
Log:
Merged revision 995528 from trunk:

Refactor changes module to allow for accumulators with the callback (optional, doesn't break public API).


Modified:
    couchdb/branches/new_replicator/src/couchdb/couch_changes.erl

Modified: couchdb/branches/new_replicator/src/couchdb/couch_changes.erl
URL: http://svn.apache.org/viewvc/couchdb/branches/new_replicator/src/couchdb/couch_changes.erl?rev=995529&r1=995528&r2=995529&view=diff
==============================================================================
--- couchdb/branches/new_replicator/src/couchdb/couch_changes.erl (original)
+++ couchdb/branches/new_replicator/src/couchdb/couch_changes.erl Thu Sep  9 18:22:22 2010
@@ -17,17 +17,18 @@
 
 %% @type Req -> #httpd{} | {json_req, JsonObj()}
 handle_changes(#changes_args{style=Style}=Args1, Req, Db) ->
-    Args = Args1#changes_args{filter=
-            make_filter_fun(Args1#changes_args.filter, Style, Req, Db)},
+    #changes_args{feed = Feed} = Args = Args1#changes_args{
+        filter = make_filter_fun(Args1#changes_args.filter, Style, Req, Db)
+    },
     StartSeq = case Args#changes_args.dir of
     rev ->
         couch_db:get_update_seq(Db);
     fwd ->
         Args#changes_args.since
     end,
-    if Args#changes_args.feed == "continuous" orelse
-        Args#changes_args.feed == "longpoll" ->
-        fun(Callback) ->
+    if Feed == "continuous" orelse Feed == "longpoll" ->
+        fun(CallbackAcc) ->
+            {Callback, UserAcc} = get_callback_acc(CallbackAcc),
             Self = self(),
             {ok, Notify} = couch_db_update_notifier:start_link(
                 fun({_, DbName}) when DbName == Db#db.name ->
@@ -36,7 +37,7 @@ handle_changes(#changes_args{style=Style
                     ok
                 end
             ),
-            start_sending_changes(Callback, Args#changes_args.feed),
+            UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
             {Timeout, TimeoutFun} = get_changes_timeout(Args, Callback),
             couch_stats_collector:track_process_count(
                 Self,
@@ -46,6 +47,7 @@ handle_changes(#changes_args{style=Style
                 keep_sending_changes(
                     Args,
                     Callback,
+                    UserAcc2,
                     Db,
                     StartSeq,
                     <<"">>,
@@ -54,24 +56,31 @@ handle_changes(#changes_args{style=Style
                 )
             after
                 couch_db_update_notifier:stop(Notify),
-                get_rest_db_updated() % clean out any remaining update messages
+                get_rest_db_updated(ok) % clean out any remaining update messages
             end
         end;
     true ->
-        fun(Callback) ->
-            start_sending_changes(Callback, Args#changes_args.feed),
-            {ok, {_, LastSeq, _Prepend, _, _, _, _, _}} =
+        fun(CallbackAcc) ->
+            {Callback, UserAcc} = get_callback_acc(CallbackAcc),
+            UserAcc2 = start_sending_changes(Callback, UserAcc, Feed),
+            {ok, {_, LastSeq, _Prepend, _, _, UserAcc3, _, _, _}} =
                 send_changes(
                     Args#changes_args{feed="normal"},
                     Callback,
+                    UserAcc2,
                     Db,
                     StartSeq,
-                    <<"">>
+                    <<>>
                 ),
-            end_sending_changes(Callback, LastSeq, Args#changes_args.feed)
+            end_sending_changes(Callback, UserAcc3, LastSeq, Feed)
         end
     end.
 
+get_callback_acc({Callback, _UserAcc} = Pair) when is_function(Callback, 3) ->
+    Pair;
+get_callback_acc(Callback) when is_function(Callback, 2) ->
+    {fun(Ev, Data, _) -> Callback(Ev, Data) end, ok}.
+
 %% @type Req -> #httpd{} | {json_req, JsonObj()}
 make_filter_fun(FilterName, Style, Req, Db) ->
     case [list_to_binary(couch_httpd:unquote(Part))
@@ -132,21 +141,23 @@ get_changes_timeout(Args, Callback) ->
         infinity ->
             {infinity, fun() -> stop end};
         _ ->
-            {lists:min([DefaultTimeout, Timeout]), fun() -> stop end}
+            {lists:min([DefaultTimeout, Timeout]),
+                fun(UserAcc) -> {stop, UserAcc} end}
         end;
     true ->
-        {DefaultTimeout, fun() -> Callback(timeout, ResponseType), ok end};
+        {DefaultTimeout,
+            fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end};
     _ ->
         {lists:min([DefaultTimeout, Heartbeat]),
-            fun() -> Callback(timeout, ResponseType), ok end}
+            fun(UserAcc) -> {ok, Callback(timeout, ResponseType, UserAcc)} end}
     end.
 
-start_sending_changes(_Callback, "continuous") ->
-    ok;
-start_sending_changes(Callback, ResponseType) ->
-    Callback(start, ResponseType).
+start_sending_changes(_Callback, UserAcc, "continuous") ->
+    UserAcc;
+start_sending_changes(Callback, UserAcc, ResponseType) ->
+    Callback(start, ResponseType, UserAcc).
 
-send_changes(Args, Callback, Db, StartSeq, Prepend) ->
+send_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend) ->
     #changes_args{
         style = Style,
         include_docs = IncludeDocs,
@@ -161,33 +172,34 @@ send_changes(Args, Callback, Db, StartSe
         StartSeq,
         fun changes_enumerator/2,
         [{dir, Dir}],
-        {Db, StartSeq, Prepend, FilterFun, Callback, ResponseType, Limit,
-            IncludeDocs}
+        {Db, StartSeq, Prepend, FilterFun, Callback, UserAcc, ResponseType,
+            Limit, IncludeDocs}
     ).
 
-keep_sending_changes(Args, Callback, Db, StartSeq, Prepend, Timeout,
+keep_sending_changes(Args, Callback, UserAcc, Db, StartSeq, Prepend, Timeout,
     TimeoutFun) ->
     #changes_args{
         feed = ResponseType,
         limit = Limit
     } = Args,
     % ?LOG_INFO("send_changes start ~p",[StartSeq]),
-    {ok, {_, EndSeq, Prepend2, _, _, _, NewLimit, _}} = send_changes(
-        Args#changes_args{dir=fwd}, Callback, Db, StartSeq, Prepend
+    {ok, {_, EndSeq, Prepend2, _, _, UserAcc2, _, NewLimit, _}} = send_changes(
+        Args#changes_args{dir=fwd}, Callback, UserAcc, Db, StartSeq, Prepend
     ),
     % ?LOG_INFO("send_changes last ~p",[EndSeq]),
     couch_db:close(Db),
     if Limit > NewLimit, ResponseType == "longpoll" ->
-        end_sending_changes(Callback, EndSeq, ResponseType);
+        end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType);
     true ->
-        case wait_db_updated(Timeout, TimeoutFun) of
-        updated ->
+        case wait_db_updated(Timeout, TimeoutFun, UserAcc2) of
+        {updated, UserAcc3} ->
             % ?LOG_INFO("wait_db_updated updated ~p",[{Db#db.name, EndSeq}]),
             case couch_db:open(Db#db.name, [{user_ctx, Db#db.user_ctx}]) of
             {ok, Db2} ->
                 keep_sending_changes(
                     Args#changes_args{limit=NewLimit},
                     Callback,
+                    UserAcc3,
                     Db2,
                     EndSeq,
                     Prepend2,
@@ -195,19 +207,19 @@ keep_sending_changes(Args, Callback, Db,
                     TimeoutFun
                 );
             _Else ->
-                end_sending_changes(Callback, EndSeq, ResponseType)
+                end_sending_changes(Callback, UserAcc2, EndSeq, ResponseType)
             end;
-        stop ->
+        {stop, UserAcc3} ->
             % ?LOG_INFO("wait_db_updated stop ~p",[{Db#db.name, EndSeq}]),
-            end_sending_changes(Callback, EndSeq, ResponseType)
+            end_sending_changes(Callback, UserAcc3, EndSeq, ResponseType)
         end
     end.
 
-end_sending_changes(Callback, EndSeq, ResponseType) ->
-    Callback({stop, EndSeq}, ResponseType).
+end_sending_changes(Callback, UserAcc, EndSeq, ResponseType) ->
+    Callback({stop, EndSeq}, ResponseType, UserAcc).
 
-changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, "continuous",
-    Limit, IncludeDocs}) ->
+changes_enumerator(DocInfo, {Db, _, _, FilterFun, Callback, UserAcc,
+    "continuous", Limit, IncludeDocs}) ->
 
     #doc_info{id=Id, high_seq=Seq,
             revs=[#rev_info{deleted=Del,rev=Rev}|_]} = DocInfo,
@@ -216,18 +228,18 @@ changes_enumerator(DocInfo, {Db, _, _, F
     Go = if Limit =< 1 -> stop; true -> ok end,
     case Results of
     [] ->
-        {Go, {Db, Seq, nil, FilterFun, Callback, "continuous", Limit,
+        {Go, {Db, Seq, nil, FilterFun, Callback, UserAcc, "continuous", Limit,
                 IncludeDocs}
         };
     _ ->
         ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs),
-        Callback({change, ChangesRow, <<"">>}, "continuous"),
-        {Go, {Db, Seq, nil, FilterFun, Callback, "continuous",  Limit - 1,
-                IncludeDocs}
+        UserAcc2 = Callback({change, ChangesRow, <<>>}, "continuous", UserAcc),
+        {Go, {Db, Seq, nil, FilterFun, Callback, UserAcc2, "continuous",
+                Limit - 1, IncludeDocs}
         }
     end;
-changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, ResponseType,
-    Limit, IncludeDocs}) ->
+changes_enumerator(DocInfo, {Db, _, Prepend, FilterFun, Callback, UserAcc,
+    ResponseType, Limit, IncludeDocs}) ->
 
     #doc_info{id=Id, high_seq=Seq, revs=[#rev_info{deleted=Del,rev=Rev}|_]}
         = DocInfo,
@@ -236,14 +248,14 @@ changes_enumerator(DocInfo, {Db, _, Prep
     Go = if Limit =< 1 -> stop; true -> ok end,
     case Results of
     [] ->
-        {Go, {Db, Seq, Prepend, FilterFun, Callback, ResponseType, Limit,
-                IncludeDocs}
+        {Go, {Db, Seq, Prepend, FilterFun, Callback, UserAcc, ResponseType,
+                Limit, IncludeDocs}
         };
     _ ->
         ChangesRow = changes_row(Db, Seq, Id, Del, Results, Rev, IncludeDocs),
-        Callback({change, ChangesRow, Prepend}, ResponseType),
-        {Go, {Db, Seq, <<",\n">>, FilterFun, Callback, ResponseType, Limit - 1,
-                IncludeDocs}
+        UserAcc2 = Callback({change, ChangesRow, Prepend}, ResponseType, UserAcc),
+        {Go, {Db, Seq, <<",\n">>, FilterFun, Callback, UserAcc2, ResponseType,
+                Limit - 1, IncludeDocs}
         }
     end.
 
@@ -259,16 +271,24 @@ deleted_item(true) -> [{<<"deleted">>, t
 deleted_item(_) -> [].
 
 % waits for a db_updated msg, if there are multiple msgs, collects them.
-wait_db_updated(Timeout, TimeoutFun) ->
-    receive db_updated -> get_rest_db_updated()
+wait_db_updated(Timeout, TimeoutFun, UserAcc) ->
+    receive
+    db_updated ->
+        get_rest_db_updated(UserAcc)
     after Timeout ->
-        case TimeoutFun() of
-        ok -> wait_db_updated(Timeout, TimeoutFun);
-        stop -> stop
+        {Go, UserAcc2} = TimeoutFun(UserAcc),
+        case Go of
+        ok ->
+            wait_db_updated(Timeout, TimeoutFun, UserAcc2);
+        stop ->
+            {stop, UserAcc2}
         end
     end.
 
-get_rest_db_updated() ->
-    receive db_updated -> get_rest_db_updated()
-    after 0 -> updated
+get_rest_db_updated(UserAcc) ->
+    receive
+    db_updated ->
+        get_rest_db_updated(UserAcc)
+    after 0 ->
+        {updated, UserAcc}
     end.