You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by to...@apache.org on 2020/04/15 22:27:36 UTC

[couchdb] branch prototype/fdb-layer updated: report changes stats intermittently (#2777)

This is an automated email from the ASF dual-hosted git repository.

tonysun83 pushed a commit to branch prototype/fdb-layer
in repository https://gitbox.apache.org/repos/asf/couchdb.git


The following commit(s) were added to refs/heads/prototype/fdb-layer by this push:
     new 27cbad7  report changes stats intermittently (#2777)
27cbad7 is described below

commit 27cbad74404d6d1c6ecf5717cf70c438f6c03eea
Author: Tony Sun <to...@gmail.com>
AuthorDate: Wed Apr 15 15:27:26 2020 -0700

    report changes stats intermittently (#2777)
    
    * report changes stats intermittently with boolean market
    
    Stats are reported at the end of a request. With changes feeds,
    sometimes the request can be long or forever. This commit allows
    stats to be reported intermittently via a configurable time in seconds.
    The report function can return a boolean whether stats was reported
    so that a reset may not necessarily be needed.
---
 src/chttpd/src/chttpd.erl                    |  7 +-
 src/chttpd/src/chttpd_db.erl                 | 25 +++++---
 src/chttpd/src/chttpd_stats.erl              | 96 +++++++++++++++++++++-------
 src/chttpd/test/eunit/chttpd_stats_tests.erl | 77 ++++++++++++++++++++++
 4 files changed, 171 insertions(+), 34 deletions(-)

diff --git a/src/chttpd/src/chttpd.erl b/src/chttpd/src/chttpd.erl
index 2641007..4640258 100644
--- a/src/chttpd/src/chttpd.erl
+++ b/src/chttpd/src/chttpd.erl
@@ -269,8 +269,9 @@ handle_request_int(MochiReq) ->
 before_request(HttpReq) ->
     ctrace:is_enabled() andalso start_span(HttpReq),
     try
-        chttpd_stats:init(),
-        chttpd_plugin:before_request(HttpReq)
+        {ok, HttpReq1} = chttpd_plugin:before_request(HttpReq),
+        chttpd_stats:init(HttpReq1),
+        {ok, HttpReq1}
     catch Tag:Error ->
         {error, catch_error(HttpReq, Tag, Error)}
     end.
@@ -285,7 +286,7 @@ after_request(HttpReq, HttpResp0) ->
             {ok, HttpResp0#httpd_resp{status = aborted}}
         end,
     HttpResp2 = update_stats(HttpReq, HttpResp1),
-    chttpd_stats:report(HttpReq, HttpResp2),
+    chttpd_stats:report(HttpResp2),
     maybe_log(HttpReq, HttpResp2),
     HttpResp2.
 
diff --git a/src/chttpd/src/chttpd_db.erl b/src/chttpd/src/chttpd_db.erl
index 8dd0c93..8cfcfec 100644
--- a/src/chttpd/src/chttpd_db.erl
+++ b/src/chttpd/src/chttpd_db.erl
@@ -50,7 +50,8 @@
     chunks_sent = 0,
     buffer = [],
     bufsize = 0,
-    threshold
+    threshold,
+    include_docs
 }).
 
 -define(IS_ALL_DOCS(T), (
@@ -117,7 +118,8 @@ handle_changes_req_tx(#httpd{}=Req, Db) ->
         Acc0 = #cacc{
             feed = list_to_atom(Feed),
             mochi = Req,
-            threshold = Max
+            threshold = Max,
+            include_docs = ChangesArgs#changes_args.include_docs
         },
         try
             ChangesFun({fun changes_callback/2, Acc0})
@@ -133,8 +135,9 @@ handle_changes_req_tx(#httpd{}=Req, Db) ->
 changes_callback(start, #cacc{feed = continuous} = Acc) ->
     {ok, Resp} = chttpd:start_delayed_json_response(Acc#cacc.mochi, 200),
     {ok, Acc#cacc{mochi = Resp, responding = true}};
-changes_callback({change, Change}, #cacc{feed = continuous} = Acc) ->
-    chttpd_stats:incr_rows(),
+changes_callback({change, Change}, #cacc{feed = continuous,
+        include_docs = IncludeDocs} = Acc) ->
+    incr_stats_changes_feed(IncludeDocs),
     Data = [?JSON_ENCODE(Change) | "\n"],
     Len = iolist_size(Data),
     maybe_flush_changes_feed(Acc, Data, Len);
@@ -157,8 +160,9 @@ changes_callback(start, #cacc{feed = eventsource} = Acc) ->
     ],
     {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, Headers),
     {ok, Acc#cacc{mochi = Resp, responding = true}};
-changes_callback({change, {ChangeProp}=Change}, #cacc{feed = eventsource} = Acc) ->
-    chttpd_stats:incr_rows(),
+changes_callback({change, {ChangeProp}=Change},
+        #cacc{feed = eventsource, include_docs = IncludeDocs} = Acc) ->
+    incr_stats_changes_feed(IncludeDocs),
     Seq = proplists:get_value(seq, ChangeProp),
     Chunk = [
         "data: ", ?JSON_ENCODE(Change),
@@ -189,8 +193,8 @@ changes_callback(start, Acc) ->
     FirstChunk = "{\"results\":[\n",
     {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], FirstChunk),
     {ok, Acc#cacc{mochi = Resp, responding = true}};
-changes_callback({change, Change}, Acc) ->
-    chttpd_stats:incr_rows(),
+changes_callback({change, Change}, #cacc{include_docs = IncludeDocs} = Acc) ->
+    incr_stats_changes_feed(IncludeDocs),
     Data = [Acc#cacc.prepend, ?JSON_ENCODE(Change)],
     Len = iolist_size(Data),
     maybe_flush_changes_feed(Acc, Data, Len);
@@ -252,6 +256,11 @@ maybe_flush_changes_feed(Acc0, Data, Len) ->
     },
     {ok, Acc}.
 
+incr_stats_changes_feed(IncludeDocs) ->
+    chttpd_stats:incr_rows(),
+    if not IncludeDocs -> ok; true ->
+        chttpd_stats:incr_reads()
+    end.
 
 % Return the same response as if a compaction succeeded even though _compaction
 % isn't a valid operation in CouchDB >= 4.x anymore. This is mostly to not
diff --git a/src/chttpd/src/chttpd_stats.erl b/src/chttpd/src/chttpd_stats.erl
index 59ec926..27e9c31 100644
--- a/src/chttpd/src/chttpd_stats.erl
+++ b/src/chttpd/src/chttpd_stats.erl
@@ -14,8 +14,8 @@
 
 
 -export([
-    init/0,
-    report/2,
+    init/1,
+    report/1,
 
     incr_reads/0,
     incr_reads/1,
@@ -24,29 +24,40 @@
     incr_writes/1,
 
     incr_rows/0,
-    incr_rows/1
+    incr_rows/1,
+
+    update_interval/1
 ]).
 
 
 -record(st, {
     reads = 0,
     writes = 0,
-    rows = 0
+    rows = 0,
+    reporter,
+    last_report_ts = 0,
+    interval,
+    request
 }).
 
 
 -define(KEY, chttpd_stats).
+-define(INTERVAL_IN_SEC, 60).
 
-
-init() ->
-    put(?KEY, #st{}).
+init(Request) ->
+    Reporter = config:get("chttpd", "stats_reporter"),
+    Time = erlang:monotonic_time(second),
+    Interval = config:get_integer("chttpd", "stats_reporting_interval",
+        ?INTERVAL_IN_SEC),
+    put(?KEY, #st{reporter = Reporter, last_report_ts = Time,
+        interval = Interval, request = Request}).
 
 
-report(HttpReq, HttpResp) ->
+report(HttpResp) ->
     try
         case get(?KEY) of
             #st{} = St ->
-                report(HttpReq, HttpResp, St);
+                report(HttpResp, St);
             _ ->
                 ok
         end
@@ -57,19 +68,18 @@ report(HttpReq, HttpResp) ->
     end.
 
 
-report(HttpReq, HttpResp, St) ->
-    case config:get("chttpd", "stats_reporter") of
-        undefined ->
-            ok;
-        ModStr ->
-            Mod = list_to_existing_atom(ModStr),
-            #st{
-                reads = Reads,
-                writes = Writes,
-                rows = Rows
-            } = St,
-            Mod:report(HttpReq, HttpResp, Reads, Writes, Rows)
-    end.
+report(HttpResp, #st{reporter = undefined}) ->
+    ok;
+
+report(HttpResp, #st{reporter = Reporter} = St) ->
+    Mod = list_to_existing_atom(Reporter),
+    #st{
+        reads = Reads,
+        writes = Writes,
+        rows = Rows,
+        request = HttpReq
+    } = St,
+    Mod:report(HttpReq, HttpResp, Reads, Writes, Rows).
 
 
 incr_reads() ->
@@ -101,7 +111,47 @@ incr(Idx, Count) ->
         #st{} = St ->
             Total = element(Idx, St) + Count,
             NewSt = setelement(Idx, St, Total),
-            put(?KEY, NewSt);
+            put(?KEY, NewSt),
+            maybe_report_intermittent(St);
+        _ ->
+            ok
+    end.
+
+
+maybe_report_intermittent(State) ->
+    #st{last_report_ts = LastTime, interval = Interval} = State,
+    CurrentTime = erlang:monotonic_time(second),
+    case CurrentTime - LastTime of
+        Change when Change >= Interval ->
+            % Since response is not available during the request, we set
+            % this undefined. Modules that call:
+            % Mod:report(HttpReq, HttpResp, Reads, Writes, Rows) should
+            % be aware of this. Mod:report should also return a boolean
+            % to indicate if reset should occur
+            case ?MODULE:report(undefined) of
+                true ->
+                    reset_stats(State, CurrentTime);
+                _ ->
+                    ok
+            end;
         _ ->
             ok
     end.
+
+
+update_interval(Interval) ->
+    case get(?KEY) of
+        #st{} = St ->
+            put(?KEY, St#st{interval = Interval});
+        _ ->
+            ok
+    end.
+
+
+reset_stats(State, NewTime) ->
+    put(?KEY, State#st{
+        reads = 0,
+        writes = 0,
+        rows = 0,
+        last_report_ts = NewTime
+    }).
diff --git a/src/chttpd/test/eunit/chttpd_stats_tests.erl b/src/chttpd/test/eunit/chttpd_stats_tests.erl
new file mode 100644
index 0000000..1742285
--- /dev/null
+++ b/src/chttpd/test/eunit/chttpd_stats_tests.erl
@@ -0,0 +1,77 @@
+-module(chttpd_stats_tests).
+
+-include_lib("couch/include/couch_eunit.hrl").
+-include_lib("couch/include/couch_db.hrl").
+
+
+start() ->
+    ok = application:start(config),
+    ok = application:start(couch_log).
+
+
+stop(_) ->
+    ok = application:stop(config),
+    ok = application:stop(couch_log).
+
+
+setup() ->
+    ok = meck:new(chttpd_stats, [passthrough]).
+
+
+teardown(_) ->
+    meck:unload(),
+    ok.
+
+
+
+chttpd_stats_test_() ->
+    {
+        "chttpd_stats tests",
+        {
+            setup,
+            fun start/0,
+            fun stop/1,
+            {
+                foreach,
+                fun setup/0, fun teardown/1,
+                [
+                    fun test_reset/1,
+                    fun test_no_reset/1
+                ]
+            }
+        }
+    }.
+
+
+test_reset(_) ->
+    ?_test(begin
+        chttpd_stats:init(undefined),
+        chttpd_stats:incr_rows(3),
+        chttpd_stats:incr_rows(),
+        chttpd_stats:incr_writes(5),
+        chttpd_stats:incr_writes(),
+        chttpd_stats:incr_reads(),
+        chttpd_stats:incr_reads(2),
+        State1 = get(chttpd_stats),
+        ?assertMatch({st, 3, 6, 4, _, _, _, _}, State1),
+
+        ok = meck:expect(chttpd_stats, report, fun(_) -> true end),
+        % force a reset with 0 interval
+        chttpd_stats:update_interval(0),
+        % after this is called, the report should happen and rows should
+        % reset to 0
+        chttpd_stats:incr_rows(),
+        ResetState = get(chttpd_stats),
+        ?assertMatch({st, 0, 0, 0, _, _, _, _}, ResetState)
+    end).
+
+
+test_no_reset(_) ->
+    ?_test(begin
+        ok = meck:expect(chttpd_stats, report, fun(_) -> false end),
+        chttpd_stats:init(undefined),
+        chttpd_stats:update_interval(0),
+        chttpd_stats:incr_rows(),
+        State = get(chttpd_stats),
+        ?assertMatch({st, 0, 0, 1, _, _, _, _}, State)
+    end).