Use a record for changes_callback accumulator

This change allows us to evolve the accumulator in a less-brittle way
and sets the stage for new data to be held in the accumulator to address

In the course of this change I also switched the feed labels from strings
to atoms (they're only used for pattern matching in the accumulator, and
multiple matches are executed for every row in the feed, so it seemed
silly to be using Erlang lists for that comparison), and I explicitly
indicated when we start a chunked response instead of guessing it
heuristically based on other contents in the accumulator.



Branch: refs/heads/master
Commit: f45c8b2c4262c682000b5e2ac50d16c9cade3ec6
Parents: 87fa816
Author: Adam Kocoloski <>
Authored: Mon Jun 22 19:41:37 2015 -0400
Committer: Adam Kocoloski <>
Committed: Mon Jun 22 20:57:55 2015 -0400

 src/chttpd_db.erl | 80 ++++++++++++++++++++++++++++++++------------------
 1 file changed, 51 insertions(+), 29 deletions(-)
diff --git a/src/chttpd_db.erl b/src/chttpd_db.erl
index 017f832..71c6cec 100644
--- a/src/chttpd_db.erl
+++ b/src/chttpd_db.erl
@@ -34,6 +34,15 @@
     atts_since = nil
+% Accumulator for changes_callback function
+-record(cacc, {
+    etag,
+    feed,
+    mochi,
+    prepend = "",
+    responding = false
 -define(IS_ALL_DOCS(T), (
     T == <<"_all_docs">>
     orelse T == <<"_local_docs">>
@@ -82,13 +91,14 @@ handle_changes_req1(#httpd{}=Req, Db) ->
         DeltaT = timer:now_diff(os:timestamp(), T0) / 1000,
         couch_stats:update_histogram([couchdb, dbinfo], DeltaT),
         chttpd:etag_respond(Req, Etag, fun() ->
-            fabric:changes(Db, fun changes_callback/2, {"normal", {"Etag",Etag}, Req},
-                ChangesArgs)
+            Acc0 = #cacc{feed = normal, etag = Etag, mochi = Req},
+            fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs)
     Feed when Feed =:= "continuous"; Feed =:= "longpoll"; Feed =:= "eventsource"  ->
         couch_stats:increment_counter([couchdb, httpd, clients_requesting_changes]),
+        Acc0 = #cacc{feed = list_to_atom(Feed), mochi = Req},
-            fabric:changes(Db, fun changes_callback/2, {Feed, Req}, ChangesArgs)
+            fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs)
             couch_stats:decrement_counter([couchdb, httpd, clients_requesting_changes])
@@ -98,13 +108,15 @@ handle_changes_req1(#httpd{}=Req, Db) ->
 % callbacks for continuous feed (newline-delimited JSON Objects)
-changes_callback(start, {"continuous", Req}) ->
-    {ok, Resp} = chttpd:start_delayed_json_response(Req, 200),
-    {ok, {"continuous", Resp}};
-changes_callback({change, Change}, {"continuous", Resp}) ->
+changes_callback(start, #cacc{feed = continuous} = Acc) ->
+    {ok, Resp} = chttpd:start_delayed_json_response(Acc#cacc.mochi, 200),
+    {ok, Acc#cacc{mochi = Resp, responding = true}};
+changes_callback({change, Change}, #cacc{feed = continuous} = Acc) ->
+    #cacc{mochi = Resp} = Acc,
     {ok, Resp1} = chttpd:send_delayed_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]),
-    {ok, {"continuous", Resp1}};
-changes_callback({stop, EndSeq0, Pending}, {"continuous", Resp}) ->
+    {ok, Acc#cacc{mochi = Resp1}};
+changes_callback({stop, EndSeq0, Pending}, #cacc{feed = continuous} = Acc) ->
+    #cacc{mochi = Resp} = Acc,
     EndSeq = case is_old_couch(Resp) of true -> 0; false -> EndSeq0 end,
     Row = {[
         {<<"last_seq">>, EndSeq},
@@ -114,14 +126,16 @@ changes_callback({stop, EndSeq0, Pending}, {"continuous", Resp}) ->
 % callbacks for eventsource feed (newline-delimited eventsource Objects)
-changes_callback(start, {"eventsource", Req}) ->
+changes_callback(start, #cacc{feed = eventsource} = Acc) ->
+    #cacc{mochi = Req} = Acc,
     Headers = [
         {"Content-Type", "text/event-stream"},
         {"Cache-Control", "no-cache"}
     {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, Headers),
-    {ok, {"eventsource", Resp}};
-changes_callback({change, {ChangeProp}=Change}, {"eventsource", Resp}) ->
+    {ok, Acc#cacc{mochi = Resp, responding = true}};
+changes_callback({change, {ChangeProp}=Change}, #cacc{feed = eventsource} = Acc) ->
+    #cacc{mochi = Resp} = Acc,
     Seq = proplists:get_value(seq, ChangeProp),
     Chunk = [
         "data: ", ?JSON_ENCODE(Change),
@@ -129,28 +143,34 @@ changes_callback({change, {ChangeProp}=Change}, {"eventsource", Resp}) ->
     {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk),
-    {ok, {"eventsource", Resp1}};
-changes_callback(timeout, {"eventsource", Resp}) ->
+    {ok, Acc#cacc{mochi = Resp1}};
+changes_callback(timeout, #cacc{feed = eventsource} = Acc) ->
+    #cacc{mochi = Resp} = Acc,
     Chunk = "event: heartbeat\ndata: \n\n",
     {ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk),
     {ok, {"eventsource", Resp1}};
-changes_callback({stop, _EndSeq}, {"eventsource", Resp}) ->
+changes_callback({stop, _EndSeq}, #cacc{feed = eventsource} = Acc) ->
+    Resp = Acc#cacc.mochi,
 % callbacks for longpoll and normal (single JSON Object)
-changes_callback(start, {"normal", {"Etag", Etag}, Req}) ->
+changes_callback(start, #cacc{feed = normal} = Acc) ->
+    #cacc{etag = Etag, mochi = Req} = Acc,
     FirstChunk = "{\"results\":[\n",
     {ok, Resp} = chttpd:start_delayed_json_response(Req, 200,
         [{"Etag",Etag}], FirstChunk),
-    {ok, {"", Resp}};
-changes_callback(start, {_, Req}) ->
+    {ok, Acc#cacc{mochi = Resp, responding = true}};
+changes_callback(start, Acc) ->
+    #cacc{mochi = Req} = Acc,
     FirstChunk = "{\"results\":[\n",
     {ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], FirstChunk),
-    {ok, {"", Resp}};
-changes_callback({change, Change}, {Prepend, Resp}) ->
+    {ok, Acc#cacc{mochi = Resp, responding = true}};
+changes_callback({change, Change}, Acc) ->
+    #cacc{prepend = Prepend, mochi = Resp} = Acc,
     {ok, Resp1} = chttpd:send_delayed_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]),
-    {ok, {",\r\n", Resp1}};
-changes_callback({stop, EndSeq, Pending}, {_, Resp}) ->
+    {ok, Acc#cacc{prepend = ",\r\n", mochi = Resp1}};
+changes_callback({stop, EndSeq, Pending}, Acc) ->
+    #cacc{mochi = Resp} = Acc,
     {ok, Resp1} = case is_old_couch(Resp) of
     true ->
         chttpd:send_delayed_chunk(Resp, "\n],\n\"last_seq\":0}\n");
@@ -165,15 +185,17 @@ changes_callback({stop, EndSeq, Pending}, {_, Resp}) ->
-changes_callback(timeout, {Prepend, Resp}) ->
-    {ok, Resp1} = chttpd:send_delayed_chunk(Resp, "\n"),
-    {ok, {Prepend, Resp1}};
-changes_callback({error, Reason}, {_, #httpd{}=Req}) ->
+changes_callback(timeout, Acc) ->
+    {ok, Resp1} = chttpd:send_delayed_chunk(Acc#cacc.mochi, "\n"),
+    {ok, Acc#cacc{mochi = Resp1}};
+changes_callback({error, Reason}, #cacc{mochi = #httpd{}} = Acc) ->
+    #cacc{mochi = Req} = Acc,
     chttpd:send_error(Req, Reason);
-changes_callback({error, Reason}, {"normal", {"Etag", _Etag}, Req}) ->
+changes_callback({error, Reason}, #cacc{feed = normal, responding = false} = Acc) ->
+    #cacc{mochi = Req} = Acc,
     chttpd:send_error(Req, Reason);
-changes_callback({error, Reason}, {_, Resp}) ->
-    chttpd:send_delayed_error(Resp, Reason).
+changes_callback({error, Reason}, Acc) ->
+    chttpd:send_delayed_error(Acc#cacc.mochi, Reason).
 is_old_couch(Resp) ->
     MochiReq = chttpd:get_delayed_req(Resp),