You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2021/09/24 13:07:12 UTC

[couchdb] branch changes_duration created (now 179c36a)

This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a change to branch changes_duration
in repository https://gitbox.apache.org/repos/asf/couchdb.git.


      at 179c36a  support maximum changes_duration configuration option

This branch includes the following new commits:

     new 179c36a  support maximum changes_duration configuration option

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[couchdb] 01/01: support maximum changes_duration configuration option

Posted by rn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rnewson pushed a commit to branch changes_duration
in repository https://gitbox.apache.org/repos/asf/couchdb.git

commit 179c36a0c10845c16ded8ed669d984e70805d2f6
Author: Robert Newson <rn...@apache.org>
AuthorDate: Fri Sep 24 14:03:35 2021 +0100

    support maximum changes_duration configuration option
---
 src/chttpd/src/chttpd_changes.erl | 61 +++++++++++++++++++++++++--------------
 1 file changed, 40 insertions(+), 21 deletions(-)

diff --git a/src/chttpd/src/chttpd_changes.erl b/src/chttpd/src/chttpd_changes.erl
index dbe3e95..9b1f84b 100644
--- a/src/chttpd/src/chttpd_changes.erl
+++ b/src/chttpd/src/chttpd_changes.erl
@@ -31,7 +31,7 @@
 
 %% export so we can use fully qualified call to facilitate hot-code upgrade
 -export([
-    keep_sending_changes/3
+    keep_sending_changes/4
 ]).
 
 -record(changes_acc, {
@@ -107,7 +107,8 @@ handle_db_changes(Args0, Req, Db0) ->
                     keep_sending_changes(
                         Args#changes_args{dir = fwd},
                         Acc0,
-                        true
+                        true,
+                        os:timestamp()
                     )
                 after
                     fabric2_events:stop_listener(Listener),
@@ -573,7 +574,7 @@ send_lookup_changes(FullDocInfos, StartSeq, Dir, Db, Fun, Acc0) ->
             {ok, FinalAcc}
     end.
 
-keep_sending_changes(Args, Acc0, FirstRound) ->
+keep_sending_changes(Args, Acc0, FirstRound, T0) ->
     #changes_args{
         feed = ResponseType,
         limit = Limit,
@@ -604,24 +605,32 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
                 true ->
                     case wait_updated(Timeout, TimeoutFun, UserAcc3) of
                         {updated, UserAcc4} ->
-                            UserCtx = fabric2_db:get_user_ctx(Db),
-                            DbOptions1 = [{user_ctx, UserCtx} | DbOptions],
-                            case fabric2_db:open(fabric2_db:name(Db), DbOptions1) of
-                                {ok, Db2} ->
-                                    ?MODULE:keep_sending_changes(
-                                        Args#changes_args{limit = NewLimit},
-                                        ChangesAcc#changes_acc{
-                                            db = Db2,
-                                            user_acc = UserAcc4,
-                                            seq = EndSeq,
-                                            prepend = Prepend2,
-                                            timeout = Timeout,
-                                            timeout_fun = TimeoutFun
-                                        },
-                                        false
-                                    );
-                                _Else ->
-                                    end_sending_changes(Callback, UserAcc3, EndSeq)
+                            AccumulatedTime = timer:now_diff(os:timestamp(), T0) div 1000,
+                            Max = changes_duration(),
+                            case AccumulatedTime > Max of
+                                true ->
+                                    end_sending_changes(Callback, UserAcc4, EndSeq);
+                                false ->
+                                    UserCtx = fabric2_db:get_user_ctx(Db),
+                                    DbOptions1 = [{user_ctx, UserCtx} | DbOptions],
+                                    case fabric2_db:open(fabric2_db:name(Db), DbOptions1) of
+                                        {ok, Db2} ->
+                                            ?MODULE:keep_sending_changes(
+                                                Args#changes_args{limit = NewLimit},
+                                                ChangesAcc#changes_acc{
+                                                    db = Db2,
+                                                    user_acc = UserAcc4,
+                                                    seq = EndSeq,
+                                                    prepend = Prepend2,
+                                                    timeout = Timeout,
+                                                    timeout_fun = TimeoutFun
+                                                },
+                                                false,
+                                                T0
+                                            );
+                                        _Else ->
+                                            end_sending_changes(Callback, UserAcc3, EndSeq)
+                                    end
                             end;
                         {stop, UserAcc4} ->
                             end_sending_changes(Callback, UserAcc4, EndSeq)
@@ -629,6 +638,16 @@ keep_sending_changes(Args, Acc0, FirstRound) ->
             end
     end.
 
+
+changes_duration() ->
+    %% preserving original (3.x) configuration segment;
+    case config:get("fabric", "changes_duration") of
+        undefined ->
+            infinity;
+        MaxStr ->
+            list_to_integer(MaxStr)
+    end.
+
 notify_waiting_for_updates(Callback, UserAcc) ->
     Callback(waiting_for_updates, UserAcc).