You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ja...@apache.org on 2019/04/05 21:25:10 UTC

[couchdb-ibrowse] 04/20: Added stream_full_chunks option

This is an automated email from the ASF dual-hosted git repository.

jaydoane pushed a commit to branch handle-dead-ibrowse_lb-pids
in repository https://gitbox.apache.org/repos/asf/couchdb-ibrowse.git

commit fd3c3ad9bf6040506e4304751f5afdb1eec23713
Author: Patrick Schneider <pa...@meetnow.eu>
AuthorDate: Wed Oct 19 13:55:07 2016 +0200

    Added stream_full_chunks option
    
    With this option, the process given by stream_to will get a message with chunk data each fully received chunk; stream_chunk_size is set to infinity for this. This is useful for certain applications that stream live data with each unit being represented by a single chunk; one example is CouchDB's continuous changes feed.
---
 doc/ibrowse.html            |  2 +-
 src/ibrowse.erl             |  1 +
 src/ibrowse_http_client.erl | 54 +++++++++++++++++++++++++++++++++------------
 3 files changed, 42 insertions(+), 15 deletions(-)

diff --git a/doc/ibrowse.html b/doc/ibrowse.html
index e7c5afd..b6738a6 100644
--- a/doc/ibrowse.html
+++ b/doc/ibrowse.html
@@ -208,7 +208,7 @@ send_req/4, send_req/5, send_req/6.</p>
 <h3 class="function"><a name="send_req-5">send_req/5</a></h3>
 <div class="spec">
 <p><tt>send_req(Url::string(), Headers::<a href="#type-headerList">headerList()</a>, Method::<a href="#type-method">method()</a>, Body::<a href="#type-body">body()</a>, Options::<a href="#type-optionList">optionList()</a>) -&gt; <a href="#type-response">response()</a></tt>
-<ul class="definitions"><li><tt><a name="type-optionList">optionList()</a> = [<a href="#type-option">option()</a>]</tt></li><li><tt><a name="type-option">option()</a> = {max_sessions, integer()} | {response_format, <a href="#type-response_format">response_format()</a>} | {stream_chunk_size, integer()} | {max_pipeline_size, integer()} | {trace, boolean()} | {is_ssl, boolean()} | {ssl_options, [SSLOpt]} | {pool_name, atom()} | {proxy_host, string()} | {proxy_port, integer()} | {proxy_user, [...]
+<ul class="definitions"><li><tt><a name="type-optionList">optionList()</a> = [<a href="#type-option">option()</a>]</tt></li><li><tt><a name="type-option">option()</a> = {max_sessions, integer()} | {response_format, <a href="#type-response_format">response_format()</a>} | {stream_full_chunks, boolean()} | {stream_chunk_size, integer()} | {max_pipeline_size, integer()} | {trace, boolean()} | {is_ssl, boolean()} | {ssl_options, [SSLOpt]} | {pool_name, atom()} | {proxy_host, string()} | {pro [...]
 </div><p>Same as send_req/4.</p>
 
 <h3 class="function"><a name="send_req-6">send_req/6</a></h3>
diff --git a/src/ibrowse.erl b/src/ibrowse.erl
index b243f36..648c698 100644
--- a/src/ibrowse.erl
+++ b/src/ibrowse.erl
@@ -272,6 +272,7 @@ send_req(Url, Headers, Method, Body) ->
 %% optionList() = [option()]
 %% option() = {max_sessions, integer()}        |
 %%          {response_format,response_format()}|
+%%          {stream_full_chunks, boolean()}    |
 %%          {stream_chunk_size, integer()}     |
 %%          {max_pipeline_size, integer()}     |
 %%          {trace, boolean()}                 | 
diff --git a/src/ibrowse_http_client.erl b/src/ibrowse_http_client.erl
index 7b060aa..867117c 100644
--- a/src/ibrowse_http_client.erl
+++ b/src/ibrowse_http_client.erl
@@ -61,6 +61,7 @@
                   stream_to, caller_controls_socket = false,
                   caller_socket_options = [],
                   req_id,
+                  stream_full_chunks = false,
                   stream_chunk_size,
                   save_response_to_file = false,
                   tmp_file_name, tmp_file_fd, preserve_chunked_encoding,
@@ -909,6 +910,7 @@ send_req_1(From,
                       options                = Options,
                       req_id                 = ReqId,
                       save_response_to_file  = SaveResponseToFile,
+                      stream_full_chunks     = get_value(stream_full_chunks, Options, false),
                       stream_chunk_size      = get_stream_chunk_size(Options),
                       response_format        = Resp_format,
                       from                   = From,
@@ -1451,22 +1453,41 @@ parse_11_response(DataRecvd,
                   #state{transfer_encoding = chunked,
                          chunk_size = CSz,
                          recvd_chunk_size = Recvd_csz,
-                         rep_buf_size = RepBufSz} = State) ->
+                         reply_buffer = RepBuf,
+                         rep_buf_size = RepBufSz,
+                         streamed_size = Streamed_size,
+                         cur_req = CurReq} = State) ->
     NeedBytes = CSz - Recvd_csz,
     DataLen = size(DataRecvd),
     do_trace("Recvd more data: size: ~p. NeedBytes: ~p~n", [DataLen, NeedBytes]),
     case DataLen >= NeedBytes of
         true ->
             {RemChunk, RemData} = split_binary(DataRecvd, NeedBytes),
-            do_trace("Recvd another chunk...~p~n", [RemChunk]),
-            do_trace("RemData -> ~p~n", [RemData]),
-            case accumulate_response(RemChunk, State) of
-                {error, Reason} ->
-                    do_trace("Error accumulating response --> ~p~n", [Reason]),
-                    {error, Reason};
-                #state{} = State_1 ->
-                    State_2 = State_1#state{chunk_size=tbd},
-                    parse_11_response(RemData, State_2)
+            case CurReq of
+                #request{stream_to = StreamTo, caller_controls_socket = false, req_id = ReqId, stream_full_chunks = true, response_format = Response_format} ->
+                    Chunk = <<RepBuf/binary, RemChunk/binary>>,
+                    do_trace("Recvd another chunk...~p~n", [Chunk]),
+                    do_trace("RemData -> ~p~n", [RemData]),
+                    do_interim_reply(StreamTo, Response_format, ReqId, Chunk),
+                    State_1 = State#state{
+                                reply_buffer = <<>>,
+                                rep_buf_size = RepBufSz + size(RemChunk),
+                                interim_reply_sent = true,
+                                streamed_size = Streamed_size + CSz,
+                                chunk_size = tbd,
+                                recvd_chunk_size = 0},
+                    parse_11_response(RemData, State_1);
+                _ ->
+                    do_trace("Recvd another chunk...~p~n", [RemChunk]),
+                    do_trace("RemData -> ~p~n", [RemData]),
+                    case accumulate_response(RemChunk, State) of
+                        {error, Reason} ->
+                            do_trace("Error accumulating response --> ~p~n", [Reason]),
+                            {error, Reason};
+                        #state{} = State_1 ->
+                            State_2 = State_1#state{chunk_size=tbd},
+                            parse_11_response(RemData, State_2)
+                    end
             end;
         false ->
             accumulate_response(DataRecvd,
@@ -2065,11 +2086,16 @@ flatten([]) ->
     [].
 
 get_stream_chunk_size(Options) ->
-    case lists:keysearch(stream_chunk_size, 1, Options) of
-        {value, {_, V}} when V > 0 ->
-            V;
+    case get_value(stream_full_chunks, Options, false) of
+        true ->
+            infinity;
         _ ->
-            ?DEFAULT_STREAM_CHUNK_SIZE
+            case lists:keysearch(stream_chunk_size, 1, Options) of
+                {value, {_, V}} when V > 0 ->
+                    V;
+                _ ->
+                    ?DEFAULT_STREAM_CHUNK_SIZE
+            end
     end.
 
 set_inac_timer(State) ->