You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by da...@apache.org on 2009/06/01 01:43:23 UTC
svn commit: r780529 - in /couchdb/trunk/src/couchdb: couch_httpd_db.erl
couch_ref_counter.erl
Author: damien
Date: Sun May 31 23:43:22 2009
New Revision: 780529
URL: http://svn.apache.org/viewvc?rev=780529&view=rev
Log:
Added timeout and heartbeat options to the _changes api
Modified:
couchdb/trunk/src/couchdb/couch_httpd_db.erl
couchdb/trunk/src/couchdb/couch_ref_counter.erl
Modified: couchdb/trunk/src/couchdb/couch_httpd_db.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_httpd_db.erl?rev=780529&r1=780528&r2=780529&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_httpd_db.erl (original)
+++ couchdb/trunk/src/couchdb/couch_httpd_db.erl Sun May 31 23:43:22 2009
@@ -43,6 +43,25 @@
do_db_req(Req, Handler)
end.
+get_changes_timeout(Req, Resp) ->
+ DefaultTimeout = list_to_integer(
+ couch_config:get("httpd", "changes_timeout", "60000")),
+ case couch_httpd:qs_value(Req, "heartbeat") of
+ undefined ->
+ case couch_httpd:qs_value(Req, "timeout") of
+ undefined ->
+ {DefaultTimeout, fun() -> stop end};
+ TimeoutList ->
+ {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]),
+ fun() -> stop end}
+ end;
+ "true" ->
+ {DefaultTimeout, fun() -> send_chunk(Resp, "\n"), ok end};
+ TimeoutList ->
+ {lists:min([DefaultTimeout, list_to_integer(TimeoutList)]),
+ fun() -> send_chunk(Resp, " \n"), ok end}
+ end.
+
handle_changes_req(#httpd{method='GET',path_parts=[DbName|_]}=Req, Db) ->
StartSeq = list_to_integer(couch_httpd:qs_value(Req, "since", "0")),
@@ -57,13 +76,14 @@
(_) ->
ok
end),
+ {Timeout, TimeoutFun} = get_changes_timeout(Req, Resp),
couch_stats_collector:track_process_count(Self,
{httpd, clients_requesting_changes}),
try
- keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>)
+ keep_sending_changes(Req, Resp, Db, StartSeq, <<"">>, Timeout, TimeoutFun)
after
couch_db_update_notifier:stop(Notify),
- wait_db_updated(0) % clean out any remaining update messages
+ get_rest_db_updated() % clean out any remaining update messages
end;
"false" ->
@@ -77,18 +97,30 @@
send_method_not_allowed(Req, "GET,HEAD").
% waits for a db_updated msg, if there are multiple msgs, collects them.
-wait_db_updated(Timeout) ->
- receive db_updated ->
- wait_db_updated(0)
- after Timeout -> ok
+wait_db_updated(Timeout, TimeoutFun) ->
+ receive db_updated -> get_rest_db_updated()
+ after Timeout ->
+ case TimeoutFun() of
+ ok -> wait_db_updated(Timeout, TimeoutFun);
+ stop -> stop
+ end
+ end.
+
+get_rest_db_updated() ->
+ receive db_updated -> get_rest_db_updated()
+ after 0 -> updated
end.
-keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend) ->
+keep_sending_changes(#httpd{user_ctx=UserCtx,path_parts=[DbName|_]}=Req, Resp, Db, StartSeq, Prepend, Timeout, TimeoutFun) ->
{ok, {EndSeq, Prepend2}} = send_changes(Req, Resp, Db, StartSeq, Prepend),
couch_db:close(Db),
- wait_db_updated(infinity),
- {ok, Db2} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
- keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2).
+ case wait_db_updated(Timeout, TimeoutFun) of
+ updated ->
+ {ok, Db2} = couch_db:open(DbName, [{user_ctx, UserCtx}]),
+ keep_sending_changes(Req, Resp, Db2, EndSeq, Prepend2, Timeout, TimeoutFun);
+ stop ->
+ send_chunk(Resp, io_lib:format("\n],\n\"last_seq\":~w}\n", [EndSeq]))
+ end.
send_changes(Req, Resp, Db, StartSeq, Prepend0) ->
Style = list_to_existing_atom(
Modified: couchdb/trunk/src/couchdb/couch_ref_counter.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/couchdb/couch_ref_counter.erl?rev=780529&r1=780528&r2=780529&view=diff
==============================================================================
--- couchdb/trunk/src/couchdb/couch_ref_counter.erl (original)
+++ couchdb/trunk/src/couchdb/couch_ref_counter.erl Sun May 31 23:43:22 2009
@@ -75,7 +75,9 @@
erlang:demonitor(MonRef, [flush]),
dict:erase(Pid, Referrers);
{ok, {MonRef, Num}} ->
- dict:store(Pid, {MonRef, Num-1}, Referrers)
+ dict:store(Pid, {MonRef, Num-1}, Referrers);
+ error ->
+ Referrers
end,
maybe_close_async(Srv#srv{referrers=Referrers2}).