You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2009/06/01 01:43:23 UTC

svn commit: r780529 - in /couchdb/trunk/src/couchdb: couch_httpd_db.erl couch_ref_counter.erl

Author: damien
Date: Sun May 31 23:43:22 2009
New Revision: 780529

URL: http://svn.apache.org/viewvc?rev=780529&view=rev
Log:
Added timeout and heartbeat options to the _changes api

Modified:
    couchdb/trunk/src/couchdb/couch_httpd_db.erl
    couchdb/trunk/src/couchdb/couch_ref_counter.erl

Modified: couchdb/trunk/src/couchdb/couch_httpd_db.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_db.erl?rev=780529&r1=780528&r2=780529&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_db.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_db.erl Sun May 31 23:43:22 2009
@@ -43,6 +43,25 @@
         do_db_req(Req, Handler)
     end.
 
+get_changes_timeout(Req, Resp) ->
+    DefaultTimeout = list_to_integer(
+            couch_config:get("httpd", "changes_timeout", "60000")),
+    case couch_httpd:qs_value(Req, "heartbeat") of
+    undefined ->
+        case couch_httpd:qs_value(Req, "timeout") of
+        undefined ->
+            {DefaultTimeout, fun() -> stop end};
+        TimeoutList ->
+            {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]),
+                fun() -> stop end}
+        end;
+    "true" ->
+        {DefaultTimeout, fun() -> send_chunk(Resp, "\n"), ok end};
+    TimeoutList ->
+        {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]),
+            fun() -> send_chunk(Resp, " \n"), ok end}
+    end.
+
 handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) ->
     StartSeq = list_to_integer(couch_httpd:qs_value(Req, "since", "0")),
 
@@ -57,13 +76,14 @@
             (_) ->
                 ok
             end),
+        {Timeout, TimeoutFun} = get_changes_timeout(Req, Resp),
         couch_stats_collector:track_process_count(Self,
                             {httpd, clients_requesting_changes}),
         try
-            keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>)
+            keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>, Timeout, TimeoutFun)
         after
             couch_db_update_notifier:stop(Notify),
-            wait_db_updated(0) % clean out any remaining update messages
+            get_rest_db_updated() % clean out any remaining update messages
         end;
         
     "false" ->
@@ -77,18 +97,30 @@
     send_method_not_allowed(Req, "GET,HEAD").
 
 % waits for a db_updated msg, if there are multiple msgs, collects them.
-wait_db_updated(Timeout) ->
-    receive db_updated ->
-        wait_db_updated(0)
-    after Timeout -> ok
+wait_db_updated(Timeout, TimeoutFun) ->
+    receive db_updated -> get_rest_db_updated()
+    after Timeout ->
+        case TimeoutFun() of
+        ok -> wait_db_updated(Timeout, TimeoutFun);
+        stop -> stop
+        end
+    end.
+    
+get_rest_db_updated() ->
+    receive db_updated -> get_rest_db_updated()
+    after 0 -> updated
     end.
 
-keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend) ->
+keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend, Timeout, TimeoutFun) ->
     {ok, {EndSeq, Prepend2}} = send_changes(Req, Resp, Db, StartSeq, Prepend),
     couch_db:close(Db),
-    wait_db_updated(infinity),
-    {ok, Db2} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
-    keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2).
+    case wait_db_updated(Timeout, TimeoutFun) of
+    updated ->
+        {ok, Db2} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
+        keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout, TimeoutFun);
+    stop ->
+        send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq]))
+    end.
 
 send_changes(Req, Resp, Db, StartSeq, Prepend0) ->
     Style = list_to_existing_atom(

Modified: couchdb/trunk/src/couchdb/couch_ref_counter.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_ref_counter.erl?rev=780529&r1=780528&r2=780529&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_ref_counter.erl (original)
+++ couchdb/trunk/src/couchdb/couch_ref_counter.erl Sun May 31 23:43:22 2009
@@ -75,7 +75,9 @@
         erlang:demonitor(MonRef, [flush]),
         dict:erase(Pid, Referrers);
     {ok, {MonRef, Num}} ->
-        dict:store(Pid, {MonRef, Num-1}, Referrers)
+        dict:store(Pid, {MonRef, Num-1}, Referrers);
+    error ->
+        Referrers
     end,
     maybe_close_async(Srv#srv{referrers=Referrers2}).