You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ko...@apache.org on 2015/06/23 04:01:08 UTC
[1/3] chttpd commit: updated refs/heads/2724-chunked-buffering to
d4e19ee
Repository: couchdb-chttpd
Updated Branches:
refs/heads/2724-chunked-buffering [created] d4e19ee78
Remove temporary upgrade clause
This clause was only needed for a very old hot code upgrade.
Project: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/commit/87fa8166
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/tree/87fa8166
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/diff/87fa8166
Branch: refs/heads/2724-chunked-buffering
Commit: 87fa8166701dd2a183d10e93a06f7f5ccb39478e
Parents: e7f9ed8
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Mon Jun 22 19:15:23 2015 -0400
Committer: Adam Kocoloski <ad...@cloudant.com>
Committed: Mon Jun 22 19:16:25 2015 -0400
----------------------------------------------------------------------
src/chttpd_db.erl | 6 ------
1 file changed, 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/blob/87fa8166/src/chttpd_db.erl
----------------------------------------------------------------------
diff --git a/src/chttpd_db.erl b/src/chttpd_db.erl
index 7f63023..017f832 100644
--- a/src/chttpd_db.erl
+++ b/src/chttpd_db.erl
@@ -104,9 +104,6 @@ changes_callback(start, {"continuous", Req}) ->
changes_callback({change, Change}, {"continuous", Resp}) ->
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]),
{ok, {"continuous", Resp1}};
-changes_callback({stop, EndSeq0}, {"continuous", Resp}) ->
- % Temporary upgrade clause - Case 24236
- changes_callback({stop, EndSeq0, null}, {"continuous", Resp});
changes_callback({stop, EndSeq0, Pending}, {"continuous", Resp}) ->
EndSeq = case is_old_couch(Resp) of true -> 0; false -> EndSeq0 end,
Row = {[
@@ -153,9 +150,6 @@ changes_callback(start, {_, Req}) ->
changes_callback({change, Change}, {Prepend, Resp}) ->
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]),
{ok, {",\r\n", Resp1}};
-changes_callback({stop, EndSeq}, Acc) ->
- % Temporary upgrade clause - Case 24236
- changes_callback({stop, EndSeq, null}, Acc);
changes_callback({stop, EndSeq, Pending}, {_, Resp}) ->
{ok, Resp1} = case is_old_couch(Resp) of
true ->
[2/3] chttpd commit: updated refs/heads/2724-chunked-buffering to
d4e19ee
Posted by ko...@apache.org.
Use a record for changes_callback accumulator
This change allows us to evolve the accumulator in a less-brittle way
and sets the stage for new data to be held in the accumulator to address
COUCHDB-2724.
In the course of this change I also switched the feed labels from strings
to atoms (they're only used for pattern matching in the accumulator, and
multiple matches are executed for every row in the feed, so it seemed
silly to be using Erlang lists for that comparison), and I explicitly
indicated when we start a chunked response instead of guessing it
heuristically based on other contents in the accumulator.
COUCHDB-2724
Project: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/commit/f45c8b2c
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/tree/f45c8b2c
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/diff/f45c8b2c
Branch: refs/heads/2724-chunked-buffering
Commit: f45c8b2c4262c682000b5e2ac50d16c9cade3ec6
Parents: 87fa816
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Mon Jun 22 19:41:37 2015 -0400
Committer: Adam Kocoloski <ad...@cloudant.com>
Committed: Mon Jun 22 20:57:55 2015 -0400
----------------------------------------------------------------------
src/chttpd_db.erl | 80 ++++++++++++++++++++++++++++++++------------------
1 file changed, 51 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/blob/f45c8b2c/src/chttpd_db.erl
----------------------------------------------------------------------
diff --git a/src/chttpd_db.erl b/src/chttpd_db.erl
index 017f832..71c6cec 100644
--- a/src/chttpd_db.erl
+++ b/src/chttpd_db.erl
@@ -34,6 +34,15 @@
atts_since = nil
}).
+% Accumulator for changes_callback function
+-record(cacc, {
+ etag,
+ feed,
+ mochi,
+ prepend = "",
+ responding = false
+}).
+
-define(IS_ALL_DOCS(T), (
T == <<"_all_docs">>
orelse T == <<"_local_docs">>
@@ -82,13 +91,14 @@ handle_changes_req1(#httpd{}=Req, Db) ->
DeltaT = timer:now_diff(os:timestamp(), T0) / 1000,
couch_stats:update_histogram([couchdb, dbinfo], DeltaT),
chttpd:etag_respond(Req, Etag, fun() ->
- fabric:changes(Db, fun changes_callback/2, {"normal", {"Etag",Etag}, Req},
- ChangesArgs)
+ Acc0 = #cacc{feed = normal, etag = Etag, mochi = Req},
+ fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs)
end);
Feed when Feed =:= "continuous"; Feed =:= "longpoll"; Feed =:= "eventsource" ->
couch_stats:increment_counter([couchdb, httpd, clients_requesting_changes]),
+ Acc0 = #cacc{feed = list_to_atom(Feed), mochi = Req},
try
- fabric:changes(Db, fun changes_callback/2, {Feed, Req}, ChangesArgs)
+ fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs)
after
couch_stats:decrement_counter([couchdb, httpd, clients_requesting_changes])
end;
@@ -98,13 +108,15 @@ handle_changes_req1(#httpd{}=Req, Db) ->
end.
% callbacks for continuous feed (newline-delimited JSON Objects)
-changes_callback(start, {"continuous", Req}) ->
- {ok, Resp} = chttpd:start_delayed_json_response(Req, 200),
- {ok, {"continuous", Resp}};
-changes_callback({change, Change}, {"continuous", Resp}) ->
+changes_callback(start, #cacc{feed = continuous} = Acc) ->
+ {ok, Resp} = chttpd:start_delayed_json_response(Acc#cacc.mochi, 200),
+ {ok, Acc#cacc{mochi = Resp, responding = true}};
+changes_callback({change, Change}, #cacc{feed = continuous} = Acc) ->
+ #cacc{mochi = Resp} = Acc,
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, [?JSON_ENCODE(Change) | "\n"]),
- {ok, {"continuous", Resp1}};
-changes_callback({stop, EndSeq0, Pending}, {"continuous", Resp}) ->
+ {ok, Acc#cacc{mochi = Resp1}};
+changes_callback({stop, EndSeq0, Pending}, #cacc{feed = continuous} = Acc) ->
+ #cacc{mochi = Resp} = Acc,
EndSeq = case is_old_couch(Resp) of true -> 0; false -> EndSeq0 end,
Row = {[
{<<"last_seq">>, EndSeq},
@@ -114,14 +126,16 @@ changes_callback({stop, EndSeq0, Pending}, {"continuous", Resp}) ->
chttpd:end_delayed_json_response(Resp1);
% callbacks for eventsource feed (newline-delimited eventsource Objects)
-changes_callback(start, {"eventsource", Req}) ->
+changes_callback(start, #cacc{feed = eventsource} = Acc) ->
+ #cacc{mochi = Req} = Acc,
Headers = [
{"Content-Type", "text/event-stream"},
{"Cache-Control", "no-cache"}
],
{ok, Resp} = chttpd:start_delayed_json_response(Req, 200, Headers),
- {ok, {"eventsource", Resp}};
-changes_callback({change, {ChangeProp}=Change}, {"eventsource", Resp}) ->
+ {ok, Acc#cacc{mochi = Resp, responding = true}};
+changes_callback({change, {ChangeProp}=Change}, #cacc{feed = eventsource} = Acc) ->
+ #cacc{mochi = Resp} = Acc,
Seq = proplists:get_value(seq, ChangeProp),
Chunk = [
"data: ", ?JSON_ENCODE(Change),
@@ -129,28 +143,34 @@ changes_callback({change, {ChangeProp}=Change}, {"eventsource", Resp}) ->
"\n\n"
],
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk),
- {ok, {"eventsource", Resp1}};
-changes_callback(timeout, {"eventsource", Resp}) ->
+ {ok, Acc#cacc{mochi = Resp1}};
+changes_callback(timeout, #cacc{feed = eventsource} = Acc) ->
+ #cacc{mochi = Resp} = Acc,
Chunk = "event: heartbeat\ndata: \n\n",
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, Chunk),
{ok, {"eventsource", Resp1}};
-changes_callback({stop, _EndSeq}, {"eventsource", Resp}) ->
+changes_callback({stop, _EndSeq}, #cacc{feed = eventsource} = Acc) ->
+ Resp = Acc#cacc.mochi,
chttpd:end_delayed_json_response(Resp);
% callbacks for longpoll and normal (single JSON Object)
-changes_callback(start, {"normal", {"Etag", Etag}, Req}) ->
+changes_callback(start, #cacc{feed = normal} = Acc) ->
+ #cacc{etag = Etag, mochi = Req} = Acc,
FirstChunk = "{\"results\":[\n",
{ok, Resp} = chttpd:start_delayed_json_response(Req, 200,
[{"Etag",Etag}], FirstChunk),
- {ok, {"", Resp}};
-changes_callback(start, {_, Req}) ->
+ {ok, Acc#cacc{mochi = Resp, responding = true}};
+changes_callback(start, Acc) ->
+ #cacc{mochi = Req} = Acc,
FirstChunk = "{\"results\":[\n",
{ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], FirstChunk),
- {ok, {"", Resp}};
-changes_callback({change, Change}, {Prepend, Resp}) ->
+ {ok, Acc#cacc{mochi = Resp, responding = true}};
+changes_callback({change, Change}, Acc) ->
+ #cacc{prepend = Prepend, mochi = Resp} = Acc,
{ok, Resp1} = chttpd:send_delayed_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]),
- {ok, {",\r\n", Resp1}};
-changes_callback({stop, EndSeq, Pending}, {_, Resp}) ->
+ {ok, Acc#cacc{prepend = ",\r\n", mochi = Resp1}};
+changes_callback({stop, EndSeq, Pending}, Acc) ->
+ #cacc{mochi = Resp} = Acc,
{ok, Resp1} = case is_old_couch(Resp) of
true ->
chttpd:send_delayed_chunk(Resp, "\n],\n\"last_seq\":0}\n");
@@ -165,15 +185,17 @@ changes_callback({stop, EndSeq, Pending}, {_, Resp}) ->
end,
chttpd:end_delayed_json_response(Resp1);
-changes_callback(timeout, {Prepend, Resp}) ->
- {ok, Resp1} = chttpd:send_delayed_chunk(Resp, "\n"),
- {ok, {Prepend, Resp1}};
-changes_callback({error, Reason}, {_, #httpd{}=Req}) ->
+changes_callback(timeout, Acc) ->
+ {ok, Resp1} = chttpd:send_delayed_chunk(Acc#cacc.mochi, "\n"),
+ {ok, Acc#cacc{mochi = Resp1}};
+changes_callback({error, Reason}, #cacc{mochi = #httpd{}} = Acc) ->
+ #cacc{mochi = Req} = Acc,
chttpd:send_error(Req, Reason);
-changes_callback({error, Reason}, {"normal", {"Etag", _Etag}, Req}) ->
+changes_callback({error, Reason}, #cacc{feed = normal, responding = false} = Acc) ->
+ #cacc{mochi = Req} = Acc,
chttpd:send_error(Req, Reason);
-changes_callback({error, Reason}, {_, Resp}) ->
- chttpd:send_delayed_error(Resp, Reason).
+changes_callback({error, Reason}, Acc) ->
+ chttpd:send_delayed_error(Acc#cacc.mochi, Reason).
is_old_couch(Resp) ->
MochiReq = chttpd:get_delayed_req(Resp),
[3/3] chttpd commit: updated refs/heads/2724-chunked-buffering to
d4e19ee
Posted by ko...@apache.org.
Buffer rows for normal/longpoll feeds
This patch causses the coordinator to accumulate data in its own buffer
and reduce the number of calls to write data on the socket. The size of
the buffer is configurable:
[httpd]
chunked_response_buffer = 1490
The default is chosen to approximately fill a standard Ethernet frame.
COUCHDB-2724
Project: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/commit/d4e19ee7
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/tree/d4e19ee7
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/diff/d4e19ee7
Branch: refs/heads/2724-chunked-buffering
Commit: d4e19ee78b32e5604f646ae8133e8ad14544f5e5
Parents: f45c8b2
Author: Adam Kocoloski <ad...@cloudant.com>
Authored: Mon Jun 22 21:45:26 2015 -0400
Committer: Adam Kocoloski <ad...@cloudant.com>
Committed: Mon Jun 22 21:59:41 2015 -0400
----------------------------------------------------------------------
src/chttpd_db.erl | 45 +++++++++++++++++++++++++++++++++++++--------
1 file changed, 37 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/couchdb-chttpd/blob/d4e19ee7/src/chttpd_db.erl
----------------------------------------------------------------------
diff --git a/src/chttpd_db.erl b/src/chttpd_db.erl
index 71c6cec..84c7084 100644
--- a/src/chttpd_db.erl
+++ b/src/chttpd_db.erl
@@ -40,7 +40,10 @@
feed,
mochi,
prepend = "",
- responding = false
+ responding = false,
+ buffer = [],
+ bufsize = 0,
+ threshold
}).
-define(IS_ALL_DOCS(T), (
@@ -83,6 +86,8 @@ handle_changes_req1(#httpd{}=Req, Db) ->
ChangesArgs = Args0#changes_args{
filter_fun = couch_changes:configure_filter(Raw, Style, Req, Db)
},
+ % Default to ~filling the payload of a standard Ethernet frame
+ Max = config:get_integer("httpd", "chunked_response_buffer", 1490),
case ChangesArgs#changes_args.feed of
"normal" ->
T0 = os:timestamp(),
@@ -91,12 +96,21 @@ handle_changes_req1(#httpd{}=Req, Db) ->
DeltaT = timer:now_diff(os:timestamp(), T0) / 1000,
couch_stats:update_histogram([couchdb, dbinfo], DeltaT),
chttpd:etag_respond(Req, Etag, fun() ->
- Acc0 = #cacc{feed = normal, etag = Etag, mochi = Req},
+ Acc0 = #cacc{
+ feed = normal,
+ etag = Etag,
+ mochi = Req,
+ threshold = Max
+ },
fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs)
end);
Feed when Feed =:= "continuous"; Feed =:= "longpoll"; Feed =:= "eventsource" ->
couch_stats:increment_counter([couchdb, httpd, clients_requesting_changes]),
- Acc0 = #cacc{feed = list_to_atom(Feed), mochi = Req},
+ Acc0 = #cacc{
+ feed = list_to_atom(Feed),
+ mochi = Req,
+ threshold = Max
+ },
try
fabric:changes(Db, fun changes_callback/2, Acc0, ChangesArgs)
after
@@ -166,16 +180,17 @@ changes_callback(start, Acc) ->
{ok, Resp} = chttpd:start_delayed_json_response(Req, 200, [], FirstChunk),
{ok, Acc#cacc{mochi = Resp, responding = true}};
changes_callback({change, Change}, Acc) ->
- #cacc{prepend = Prepend, mochi = Resp} = Acc,
- {ok, Resp1} = chttpd:send_delayed_chunk(Resp, [Prepend, ?JSON_ENCODE(Change)]),
- {ok, Acc#cacc{prepend = ",\r\n", mochi = Resp1}};
+ Data = [Acc#cacc.prepend, ?JSON_ENCODE(Change)],
+ Len = iolist_size(Data),
+ maybe_flush_changes_feed(Acc, Data, Len);
changes_callback({stop, EndSeq, Pending}, Acc) ->
- #cacc{mochi = Resp} = Acc,
+ #cacc{buffer = Buf, mochi = Resp} = Acc,
{ok, Resp1} = case is_old_couch(Resp) of
true ->
- chttpd:send_delayed_chunk(Resp, "\n],\n\"last_seq\":0}\n");
+ chttpd:send_delayed_chunk(Resp, [Buf | "\n],\n\"last_seq\":0}\n"]);
false ->
chttpd:send_delayed_chunk(Resp, [
+ Buf,
"\n],\n\"last_seq\":",
?JSON_ENCODE(EndSeq),
",\"pending\":",
@@ -197,6 +212,20 @@ changes_callback({error, Reason}, #cacc{feed = normal, responding = false} = Acc
changes_callback({error, Reason}, Acc) ->
chttpd:send_delayed_error(Acc#cacc.mochi, Reason).
+maybe_flush_changes_feed(#cacc{bufsize=Size, threshold=Max} = Acc, Data, Len)
+ when (Size + Len) > Max ->
+ #cacc{buffer = Buffer, mochi = Resp} = Acc,
+ {ok, R1} = chttpd:send_delayed_chunk(Resp, Buffer),
+ {ok, Acc#cacc{prepend = ",\r\n", buffer = Data, bufsize=Len, mochi = R1}};
+maybe_flush_changes_feed(Acc0, Data, Len) ->
+ #cacc{buffer = Buf, bufsize = Size} = Acc0,
+ Acc = Acc0#cacc{
+ prepend = ",\r\n",
+ buffer = [Buf | Data],
+ bufsize = Size + Len
+ },
+ {ok, Acc}.
+
is_old_couch(Resp) ->
MochiReq = chttpd:get_delayed_req(Resp),
case MochiReq:get_header_value("user-agent") of