You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ko...@apache.org on 2009/07/03 17:56:51 UTC

svn commit: r790953 - in /couchdb/trunk/src/ibrowse: ibrowse.erl ibrowse_http_client.erl ibrowse_test.erl

Author: kocolosk
Date: Fri Jul  3 15:56:51 2009
New Revision: 790953

URL: http://svn.apache.org/viewvc?rev=790953&view=rev
Log:
ibrowse now allows user to control socket.  Thanks again Chandru

Modified:
    couchdb/trunk/src/ibrowse/ibrowse.erl
    couchdb/trunk/src/ibrowse/ibrowse_http_client.erl
    couchdb/trunk/src/ibrowse/ibrowse_test.erl

Modified: couchdb/trunk/src/ibrowse/ibrowse.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/ibrowse/ibrowse.erl?rev=790953&r1=790952&r2=790953&view=diff
==============================================================================
--- couchdb/trunk/src/ibrowse/ibrowse.erl (original)
+++ couchdb/trunk/src/ibrowse/ibrowse.erl Fri Jul  3 15:56:51 2009
@@ -89,6 +89,7 @@
 	 send_req_direct/5,
 	 send_req_direct/6,
 	 send_req_direct/7,
+	 stream_next/1,
 	 set_max_sessions/3,
 	 set_max_pipeline_size/3,
 	 set_dest/3,
@@ -150,7 +151,8 @@
 %% respHeader() = {headerName(), headerValue()}
 %% headerName() = string()
 %% headerValue() = string()
-%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {error, Reason}
+%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason}
+%% req_id = term()
 %% ResponseBody = string() | {file, Filename}
 %% Reason = term()
 send_req(Url, Headers, Method) ->
@@ -425,7 +427,20 @@
 	Err ->
 	    {error, {url_parsing_failed, Err}}
     end.
-    
+
+%% @doc Tell ibrowse to stream the next chunk of data to the
+%% caller. Should be used in conjunction with the
+%% <code>stream_to</code> option
+%% @spec stream_next(Req_id :: req_id()) -> ok | {error, unknown_req_id}
+stream_next(Req_id) ->    
+    case ets:lookup(ibrowse_stream, {req_id_pid, Req_id}) of
+	[] ->
+	    {error, unknown_req_id};
+	[{_, Pid}] ->
+	    catch Pid ! {stream_next, Req_id},
+	    ok
+    end.
+
 %% @doc Turn tracing on for the ibrowse process
 trace_on() ->
     ibrowse ! {trace, true}.
@@ -522,6 +537,7 @@
     put(ibrowse_trace_token, "ibrowse"),
     ets:new(ibrowse_lb, [named_table, public, {keypos, 2}]),
     ets:new(ibrowse_conf, [named_table, protected, {keypos, 2}]),
+    ets:new(ibrowse_stream, [named_table, public]),
     import_config(),
     {ok, #state{}}.
 
@@ -539,9 +555,9 @@
 	{ok, Terms} ->
 	    ets:delete_all_objects(ibrowse_conf),
 	    Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options}) 
-		     when list(Host), integer(Port),
-		     integer(MaxSess), MaxSess > 0,
-		     integer(MaxPipe), MaxPipe > 0, list(Options) ->
+		     when is_list(Host), is_integer(Port),
+		          is_integer(MaxSess), MaxSess > 0,
+		          is_integer(MaxPipe), MaxPipe > 0, is_list(Options) ->
 			  I = [{{max_sessions, Host, Port}, MaxSess},
 			       {{max_pipeline_size, Host, Port}, MaxPipe},
 			       {{options, Host, Port}, Options}],
@@ -641,13 +657,6 @@
 		      true ->
 			  catch Pid ! {trace, false}
 		  end;
-	     (#client_conn{key = {H, P, Pid}}, _) ->
-		  case lists:member({H, P}, Trace_on_dests) of
-		      false ->
-			  ok;
-		      true ->
-			  catch Pid ! {trace, false}
-		  end;
 	     (_, Acc) ->
 		  Acc
 	  end,
@@ -664,10 +673,6 @@
 	     when H == Host,
 		  P == Port ->
 		  catch Pid ! {trace, Bool};
-	     (#client_conn{key = {H, P, Pid}}, _)
-	     when H == Host,
-		  P == Port ->
-		  catch Pid ! {trace, Bool};
 	     (_, Acc) ->
 		  Acc
 	  end,

Modified: couchdb/trunk/src/ibrowse/ibrowse_http_client.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/ibrowse/ibrowse_http_client.erl?rev=790953&r1=790952&r2=790953&view=diff
==============================================================================
--- couchdb/trunk/src/ibrowse/ibrowse_http_client.erl (original)
+++ couchdb/trunk/src/ibrowse/ibrowse_http_client.erl Fri Jul  3 15:56:51 2009
@@ -47,11 +47,12 @@
 		is_closing, send_timer, content_length,
 		deleted_crlf = false, transfer_encoding,
 		chunk_size, chunk_size_buffer = <<>>, recvd_chunk_size,
-		lb_ets_tid, cur_pipeline_size = 0
+		lb_ets_tid, cur_pipeline_size = 0, prev_req_id
 	       }).
 
 -record(request, {url, method, options, from,
-		  stream_to, req_id,
+		  stream_to, caller_controls_socket = false, 
+		  req_id,
 		  stream_chunk_size,
 		  save_response_to_file = false, 
 		  tmp_file_name, tmp_file_fd,
@@ -126,144 +127,15 @@
 %%--------------------------------------------------------------------
 %% Received a request when the remote server has already sent us a
 %% Connection: Close header
-handle_call({send_req, _},
-	    _From,
-	    #state{is_closing=true}=State) ->
+handle_call({send_req, _}, _From, #state{is_closing = true} = State) ->
     {reply, {error, connection_closing}, State};
 
 handle_call({send_req, {Url, Headers, Method, Body, Options, Timeout}},
-	    From,
-	    #state{socket=undefined,
-		   host=Host, port=Port}=State) ->
-    Resp_format = get_value(response_format, Options, list),
-    {Host_1, Port_1, State_1} =
-	case get_value(proxy_host, Options, false) of
-	    false ->
-		{Host, Port, State};
-	    PHost ->
-		ProxyUser = get_value(proxy_user, Options, []),
-		ProxyPassword = get_value(proxy_password, Options, []),
-		Digest = http_auth_digest(ProxyUser, ProxyPassword),
-		{PHost, get_value(proxy_port, Options, 80),
-		 State#state{use_proxy = true,
-			     proxy_auth_digest = Digest}}
-	end,
-    StreamTo = get_value(stream_to, Options, undefined),
-    ReqId = make_req_id(),
-    SaveResponseToFile = get_value(save_response_to_file, Options, false),
-    NewReq = #request{url=Url,
-		      method=Method,
-		      stream_to=StreamTo,
-		      options=Options,
-		      req_id=ReqId,
-		      save_response_to_file = SaveResponseToFile,
-		      stream_chunk_size = get_stream_chunk_size(Options),
-		      response_format = Resp_format,
-		      from=From},
-    Reqs = queue:in(NewReq, State#state.reqs),
-    State_2 = check_ssl_options(Options, State_1#state{reqs = Reqs}),
-    do_trace("Connecting...~n", []),
-    Start_ts = now(),
-    Conn_timeout = get_value(connect_timeout, Options, Timeout),
-    case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
-	{ok, Sock} ->
-	    do_trace("Connected!~n", []),
-	    End_ts = now(),
-	    Ref = case Timeout of
-		      infinity ->
-			  undefined;
-		      _ ->
-			  Rem_time = Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000)),
-			  case Rem_time > 0 of
-			      true ->
-				  erlang:send_after(Rem_time, self(), {req_timedout, From});
-			      false ->
-				  shutting_down(State_2),
-				  do_error_reply(State_2, req_timedout),
-				  exit(normal)
-			  end
-		  end,
-	    case send_req_1(Url, Headers, Method, Body, Options, Sock, State_2) of
-		ok ->
-		    do_setopts(Sock, [{active, once}], State_2#state.is_ssl),
-		    case StreamTo of
-			undefined ->
-			    ok;
-			_ ->
-			    gen_server:reply(From, {ibrowse_req_id, ReqId})
-		    end,
-		    State_3 = inc_pipeline_counter(State_2#state{socket = Sock,
-								 send_timer = Ref,
-								 cur_req = NewReq,
-								 status = get_header}),
-		    {noreply, State_3, get_inac_timeout(State_3)};
-		Err ->
-		    shutting_down(State_2),
-		    do_trace("Send failed... Reason: ~p~n", [Err]),
-		    gen_server:reply(From, {error, send_failed}),
-		    {stop, normal, State_2}
-	    end;
-	Err ->
-	    shutting_down(State_2),
-	    do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
-	    gen_server:reply(From, {error, conn_failed}),
-	    {stop, normal, State_2}
-    end;
-
-%% Request which is to be pipelined
-handle_call({send_req, {Url, Headers, Method,
-			 Body, Options, Timeout}},
-	    From,
-	    #state{socket=Sock, status=Status, reqs=Reqs}=State) ->
-    do_trace("Recvd request in connected state. Status -> ~p NumPending: ~p~n", [Status, length(queue:to_list(Reqs))]),
-    Resp_format = get_value(response_format, Options, list),
-    StreamTo = get_value(stream_to, Options, undefined),
-    SaveResponseToFile = get_value(save_response_to_file, Options, false),
-    ReqId = make_req_id(),
-    NewReq = #request{url=Url,
-		      stream_to=StreamTo,
-		      method=Method,
-		      options=Options,
-		      req_id=ReqId,
-		      save_response_to_file = SaveResponseToFile,
-		      stream_chunk_size = get_stream_chunk_size(Options),
-		      response_format = Resp_format,
-		      from=From},
-    State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
-    case send_req_1(Url, Headers, Method, Body, Options, Sock, State_1) of
-	ok ->
-	    State_2 = inc_pipeline_counter(State_1),
-	    do_setopts(Sock, [{active, once}], State#state.is_ssl),
-	    case Timeout of
-		infinity ->
-		    ok;
-		_ ->
-		    erlang:send_after(Timeout, self(), {req_timedout, From})
-	    end,
-	    State_3 = case Status of
-			  idle ->
-			      State_2#state{status = get_header,
-					    cur_req = NewReq};
-			  _ ->
-			      State_2
-		      end,
-	    case StreamTo of
-		undefined ->
-		    ok;
-		_ ->
-		    gen_server:reply(From, {ibrowse_req_id, ReqId})
-	    end,
-	    {noreply, State_3, get_inac_timeout(State_3)};
-	Err ->
-	    shutting_down(State_1),
-	    do_trace("Send request failed: Reason: ~p~n", [Err]),
-	    gen_server:reply(From, {error, send_failed}),
-	    do_error_reply(State, send_failed),
-	    {stop, normal, State_1}
-    end;
+	    From, State) ->
+    send_req_1(From, Url, Headers, Method, Body, Options, Timeout, State);
 
-handle_call(stop, _From, #state{socket = Socket, is_ssl = Is_ssl} = State) ->
-    do_close(Socket, Is_ssl),
+handle_call(stop, _From, State) ->
+    do_close(State),
     do_error_reply(State, closing_on_request),
     {stop, normal, State};
 
@@ -294,6 +166,15 @@
 handle_info({ssl, _Sock, Data}, State) ->
     handle_sock_data(Data, State);
 
+handle_info({stream_next, Req_id}, #state{socket = Socket,
+					  is_ssl = Is_ssl,
+					  cur_req = #request{req_id = Req_id}} = State) ->
+    do_setopts(Socket, [{active, once}], Is_ssl),
+    {noreply, State};
+
+handle_info({stream_next, _Req_id}, State) ->
+    {noreply, State};
+
 handle_info({tcp_closed, _Sock}, State) ->
     do_trace("TCP connection closed by peer!~n", []),
     handle_sock_closed(State),
@@ -332,12 +213,7 @@
 %% Returns: any (ignored by gen_server)
 %%--------------------------------------------------------------------
 terminate(_Reason, State) ->
-    case State#state.socket of
-	undefined ->
-	    ok;
-	Sock ->
-	    do_close(Sock, State#state.is_ssl)
-    end.
+    do_close(State).
 
 %%--------------------------------------------------------------------
 %% Func: code_change/3
@@ -358,10 +234,10 @@
     do_trace("Data recvd on socket in state idle!. ~1000.p~n", [Data]),
     shutting_down(State),
     do_error_reply(State, data_in_status_idle),
-    do_close(State#state.socket, State#state.is_ssl),
+    do_close(State),
     {stop, normal, State};
 
-handle_sock_data(Data, #state{status=get_header, socket=Sock}=State) ->
+handle_sock_data(Data, #state{status = get_header}=State) ->
     case parse_response(Data, State) of
 	{error, _Reason} ->
 	    shutting_down(State),
@@ -370,14 +246,15 @@
 	    shutting_down(State),
 	    {stop, normal, State};
 	State_1 ->
-	    do_setopts(Sock, [{active, once}], State#state.is_ssl),
+	    active_once(State_1),
 	    {noreply, State_1, get_inac_timeout(State_1)}
     end;
 
-handle_sock_data(Data, #state{status=get_body, content_length=CL,
+handle_sock_data(Data, #state{status           = get_body,
+			      content_length   = CL,
 			      http_status_code = StatCode,
-			      recvd_headers=Headers,
-			      chunk_size=CSz, socket=Sock}=State) ->
+			      recvd_headers    = Headers,
+			      chunk_size       = CSz} = State) ->
     case (CL == undefined) and (CSz == undefined) of
 	true ->
 	    case accumulate_response(Data, State) of
@@ -387,7 +264,7 @@
 					    {error, {Reason, {stat_code, StatCode}, Headers}}),
 		    {stop, normal, State};
 		State_1 ->
-		    do_setopts(Sock, [{active, once}], State#state.is_ssl),
+		    active_once(State_1),
 		    {noreply, State_1, get_inac_timeout(State_1)}
 	    end;
 	_ ->
@@ -401,7 +278,7 @@
 		    shutting_down(State),
 		    {stop, normal, State};
 		State_1 ->
-		    do_setopts(Sock, [{active, once}], State#state.is_ssl),
+		    active_once(State_1),
 		    {noreply, State_1, get_inac_timeout(State_1)}
 	    end
     end.
@@ -452,22 +329,27 @@
 				 cur_req = CurReq}=State) ->
     #request{stream_to=StreamTo, req_id=ReqId,
 	     stream_chunk_size = Stream_chunk_size,
-	     response_format = Response_format} = CurReq,
+	     response_format = Response_format,
+	     caller_controls_socket = Caller_controls_socket} = CurReq,
     RepBuf_1 = concat_binary([RepBuf, Data]),
     New_data_size = RepBufSize - Streamed_size,
     case StreamTo of
 	undefined ->
 	    State#state{reply_buffer = RepBuf_1};
-	_ when New_data_size < Stream_chunk_size ->
-	    State#state{reply_buffer = RepBuf_1};
-	_ ->
+	_ when Caller_controls_socket == true ->
+	    do_interim_reply(StreamTo, Response_format, ReqId, RepBuf_1),
+	    State#state{reply_buffer = <<>>, 
+			streamed_size = Streamed_size + size(RepBuf_1)};
+	_ when New_data_size >= Stream_chunk_size ->
 	    {Stream_chunk, Rem_data} = split_binary(RepBuf_1, Stream_chunk_size),
 	    do_interim_reply(StreamTo, Response_format, ReqId, Stream_chunk),
 	    accumulate_response(
 	      Rem_data,
 	      State#state{
 		reply_buffer = <<>>,
-		streamed_size = Streamed_size + Stream_chunk_size})
+		streamed_size = Streamed_size + Stream_chunk_size});
+	_ ->
+	    State#state{reply_buffer = RepBuf_1}
     end.
 
 make_tmp_filename() ->
@@ -528,37 +410,45 @@
 		    [binary, {nodelay, true}, {active, false}],
 		    Timeout).
 
-do_send(Sock, Req, true)  ->  ssl:send(Sock, Req);
-do_send(Sock, Req, false) ->  gen_tcp:send(Sock, Req).
+do_send(Req, #state{socket = Sock, is_ssl = true})  ->  ssl:send(Sock, Req);
+do_send(Req, #state{socket = Sock, is_ssl = false}) ->  gen_tcp:send(Sock, Req).
 
 %% @spec do_send_body(Sock::socket_descriptor(), Source::source_descriptor(), IsSSL::boolean()) -> ok | error()
 %% source_descriptor() = fun_arity_0           |
 %%                       {fun_arity_0}         |
 %%                       {fun_arity_1, term()}
 %% error() = term()
-do_send_body(Sock, Source, IsSSL) when is_function(Source) ->
-    do_send_body(Sock, {Source}, IsSSL);
-do_send_body(Sock, {Source}, IsSSL) when is_function(Source) ->
-    do_send_body1(Sock, Source, IsSSL, Source());
-do_send_body(Sock, {Source, State}, IsSSL) when is_function(Source) ->
-    do_send_body1(Sock, Source, IsSSL, Source(State));
-do_send_body(Sock, Body, IsSSL) ->
-    do_send(Sock, Body, IsSSL).
+do_send_body(Source, State) when is_function(Source) ->
+    do_send_body({Source}, State);
+do_send_body({Source}, State) when is_function(Source) ->
+    do_send_body1(Source, Source(), State);
+do_send_body({Source, Source_state}, State) when is_function(Source) ->
+    do_send_body1(Source, Source(Source_state), State);
+do_send_body(Body, State) ->
+    do_send(Body, State).
 
-do_send_body1(Sock, Source, IsSSL, Resp) ->
+do_send_body1(Source, Resp, State) ->
     case Resp of
 	{ok, Data} ->
-	    do_send(Sock, Data, IsSSL),
-	    do_send_body(Sock, {Source}, IsSSL);
-	{ok, Data, NewState} ->
-	    do_send(Sock, Data, IsSSL),
-	    do_send_body(Sock, {Source, NewState}, IsSSL);
-	eof -> ok;
-	Err -> Err
+	    do_send(Data, State),
+	    do_send_body({Source}, State);
+	{ok, Data, New_source_state} ->
+	    do_send(Data, State),
+	    do_send_body({Source, New_source_state}, State);
+	eof ->
+	    ok;
+	Err ->
+	    Err
     end.
 
-do_close(Sock, true)  ->  ssl:close(Sock);
-do_close(Sock, false) ->  gen_tcp:close(Sock).
+do_close(#state{socket = undefined})            ->  ok;
+do_close(#state{socket = Sock, is_ssl = true})  ->  ssl:close(Sock);
+do_close(#state{socket = Sock, is_ssl = false}) ->  gen_tcp:close(Sock).
+
+active_once(#state{cur_req = #request{caller_controls_socket = true}}) ->
+    ok;
+active_once(#state{socket = Socket, is_ssl = Is_ssl}) ->
+    do_setopts(Socket, [{active, once}], Is_ssl).
 
 do_setopts(Sock, Opts, true)  ->  ssl:setopts(Sock, Opts);
 do_setopts(Sock, Opts, false) ->  inet:setopts(Sock, Opts).
@@ -571,11 +461,81 @@
 	    State#state{is_ssl=true, ssl_options=get_value(ssl_options, Options)}
     end.
 
-send_req_1(#url{abspath = AbsPath,
-		host = Host,
-		port = Port,
-		path = RelPath} = Url,
-	   Headers, Method, Body, Options, Sock, State) ->
+send_req_1(From,
+	   #url{host = Host,
+		port = Port} = Url,
+	   Headers, Method, Body, Options, Timeout,
+	   #state{socket = undefined} = State) ->
+    {Host_1, Port_1, State_1} =
+	case get_value(proxy_host, Options, false) of
+	    false ->
+		{Host, Port, State};
+	    PHost ->
+		ProxyUser     = get_value(proxy_user, Options, []),
+		ProxyPassword = get_value(proxy_password, Options, []),
+		Digest        = http_auth_digest(ProxyUser, ProxyPassword),
+		{PHost, get_value(proxy_port, Options, 80),
+		 State#state{use_proxy = true,
+			     proxy_auth_digest = Digest}}
+	end,
+    State_2 = check_ssl_options(Options, State_1),
+    do_trace("Connecting...~n", []),
+    Start_ts = now(),
+    Conn_timeout = get_value(connect_timeout, Options, Timeout),
+    case do_connect(Host_1, Port_1, Options, State_2, Conn_timeout) of
+	{ok, Sock} ->
+	    do_trace("Connected!~n", []),
+	    End_ts = now(),
+	    Timeout_1 = case Timeout of
+			    infinity ->
+				infinity;
+			    _ ->
+				Timeout - trunc(round(timer:now_diff(End_ts, Start_ts) / 1000))
+			end,
+	    State_3 = State_2#state{socket = Sock},
+	    send_req_1(From, Url, Headers, Method, Body, Options, Timeout_1, State_3);
+	Err ->
+	    shutting_down(State_2),
+	    do_trace("Error connecting. Reason: ~1000.p~n", [Err]),
+	    gen_server:reply(From, {error, conn_failed}),
+	    {stop, normal, State_2}
+    end;
+send_req_1(From,
+	   #url{abspath = AbsPath,
+		host    = Host,
+		port    = Port,
+		path    = RelPath} = Url,
+	   Headers, Method, Body, Options, Timeout,
+	   #state{status = Status} = State) ->
+    ReqId = make_req_id(),
+    Resp_format = get_value(response_format, Options, list),
+    {StreamTo, Caller_controls_socket} =
+	case get_value(stream_to, Options, undefined) of
+	    {Caller, once} when is_pid(Caller) or
+				is_atom(Caller) ->
+		Async_pid_rec = {{req_id_pid, ReqId}, self()},
+		true = ets:insert(ibrowse_stream, Async_pid_rec), 
+		{Caller, true};
+	    undefined ->
+		{undefined, false};
+	    Caller when is_pid(Caller) or
+			is_atom(Caller) ->
+		{Caller, false};
+	    Stream_to_inv ->
+		exit({invalid_option, {stream_to, Stream_to_inv}})
+	end,
+    SaveResponseToFile = get_value(save_response_to_file, Options, false),
+    NewReq = #request{url                    = Url,
+		      method                 = Method,
+		      stream_to              = StreamTo,
+		      caller_controls_socket = Caller_controls_socket,
+		      options                = Options,
+		      req_id                 = ReqId,
+		      save_response_to_file  = SaveResponseToFile,
+		      stream_chunk_size      = get_stream_chunk_size(Options),
+		      response_format        = Resp_format,
+		      from                   = From},
+    State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
     Headers_1 = add_auth_headers(Url, Options, Headers, State),
     HostHeaderValue = case lists:keysearch(host_header, 1, Options) of
 			  false ->
@@ -598,14 +558,45 @@
 		     "--- Request End ---~n", [NReq]);
 	_ -> ok
     end,
-    SndRes = case do_send(Sock, Req, State#state.is_ssl) of
-		 ok -> do_send_body(Sock, Body_1, State#state.is_ssl);
-		 Err ->
-		     io:format("Err: ~p~n", [Err]),
-		     Err
-	     end,
-    do_setopts(Sock, [{active, once}], State#state.is_ssl),
-    SndRes.
+    case do_send(Req, State) of
+	ok ->
+	    case do_send_body(Body_1, State) of
+		ok ->
+		    State_2 = inc_pipeline_counter(State_1),
+		    active_once(State_1),
+		    Ref = case Timeout of
+			      infinity ->
+				  undefined;
+			      _ ->
+				  erlang:send_after(Timeout, self(), {req_timedout, From})
+			  end,
+		    State_3 = case Status of
+				  idle ->
+				      State_2#state{status     = get_header,
+						    cur_req    = NewReq,
+						    send_timer = Ref};
+				  _ ->
+				      State_2#state{send_timer = Ref}
+			      end,
+		    case StreamTo of
+			undefined ->
+			    ok;
+			_ ->
+			    gen_server:reply(From, {ibrowse_req_id, ReqId})
+		    end,
+		    {noreply, State_3, get_inac_timeout(State_3)};
+		Err ->
+		    shutting_down(State_1),
+		    do_trace("Send failed... Reason: ~p~n", [Err]),
+		    gen_server:reply(From, {error, send_failed}),
+		    {stop, normal, State_1}
+	    end;
+	Err ->
+	    shutting_down(State_1),
+	    do_trace("Send failed... Reason: ~p~n", [Err]),
+	    gen_server:reply(From, {error, send_failed}),
+	    {stop, normal, State_1}
+    end.
 
 add_auth_headers(#url{username = User,
 		      password = UPw},
@@ -719,9 +710,9 @@
     encode_headers(L, []).
 encode_headers([{http_vsn, _Val} | T], Acc) ->
     encode_headers(T, Acc);
-encode_headers([{Name,Val} | T], Acc) when list(Name) ->
+encode_headers([{Name,Val} | T], Acc) when is_list(Name) ->
     encode_headers(T, [[Name, ": ", fmt_val(Val), crnl()] | Acc]);
-encode_headers([{Name,Val} | T], Acc) when atom(Name) ->
+encode_headers([{Name,Val} | T], Acc) when is_atom(Name) ->
     encode_headers(T, [[atom_to_list(Name), ": ", fmt_val(Val), crnl()] | Acc]);
 encode_headers([], Acc) ->
     lists:reverse(Acc).
@@ -732,25 +723,25 @@
 chunk_request_body(Body, _ChunkSize, Acc) when Body == <<>>; Body == [] ->
     LastChunk = "0\r\n",
     lists:reverse(["\r\n", LastChunk | Acc]);
-chunk_request_body(Body, ChunkSize, Acc) when binary(Body),
+chunk_request_body(Body, ChunkSize, Acc) when is_binary(Body),
                                               size(Body) >= ChunkSize ->
     <<ChunkBody:ChunkSize/binary, Rest/binary>> = Body,
     Chunk = [ibrowse_lib:dec2hex(4, ChunkSize),"\r\n",
 	     ChunkBody, "\r\n"],
     chunk_request_body(Rest, ChunkSize, [Chunk | Acc]);
-chunk_request_body(Body, _ChunkSize, Acc) when binary(Body) ->
+chunk_request_body(Body, _ChunkSize, Acc) when is_binary(Body) ->
     BodySize = size(Body),
     Chunk = [ibrowse_lib:dec2hex(4, BodySize),"\r\n",
 	     Body, "\r\n"],
     LastChunk = "0\r\n",
     lists:reverse(["\r\n", LastChunk, Chunk | Acc]);
-chunk_request_body(Body, ChunkSize, Acc) when list(Body),
+chunk_request_body(Body, ChunkSize, Acc) when is_list(Body),
                                               length(Body) >= ChunkSize ->
     {ChunkBody, Rest} = split_list_at(Body, ChunkSize),
     Chunk = [ibrowse_lib:dec2hex(4, ChunkSize),"\r\n",
 	     ChunkBody, "\r\n"],
     chunk_request_body(Rest, ChunkSize, [Chunk | Acc]);
-chunk_request_body(Body, _ChunkSize, Acc) when list(Body) ->
+chunk_request_body(Body, _ChunkSize, Acc) when is_list(Body) ->
     BodySize = length(Body),
     Chunk = [ibrowse_lib:dec2hex(4, BodySize),"\r\n",
 	     Body, "\r\n"],
@@ -840,7 +831,7 @@
 		    {error, content_length_undefined};
 		V ->
 		    case catch list_to_integer(V) of
-			V_1 when integer(V_1), V_1 >= 0 ->
+			V_1 when is_integer(V_1), V_1 >= 0 ->
 			    send_async_headers(ReqId, StreamTo, StatCode, Headers_1),
 			    do_trace("Recvd Content-Length of ~p~n", [V_1]),
 			    State_2 = State_1#state{rep_buf_size=0,
@@ -1058,17 +1049,20 @@
 parse_headers(Headers) ->
     case scan_crlf(Headers) of
 	{yes, StatusLine, T} ->
-	    Headers_1 = parse_headers_1(T),
-	    case parse_status_line(StatusLine) of
-		{ok, HttpVsn, StatCode, _Msg} ->
-		    put(http_prot_vsn, HttpVsn),
-		    {HttpVsn, StatCode, Headers_1};
-		_ -> %% A HTTP 0.9 response?
-		    put(http_prot_vsn, "HTTP/0.9"),
-		    {"HTTP/0.9", undefined, Headers}
-	    end;
-	_ ->
-	    {error, no_status_line}
+	    parse_headers(StatusLine, T);
+	{no, StatusLine} ->
+	    parse_headers(StatusLine, <<>>)
+    end.
+
+parse_headers(StatusLine, Headers) ->
+    Headers_1 = parse_headers_1(Headers),
+    case parse_status_line(StatusLine) of
+	{ok, HttpVsn, StatCode, _Msg} ->
+	    put(http_prot_vsn, HttpVsn),
+	    {HttpVsn, StatCode, Headers_1};
+	_ -> %% A HTTP 0.9 response?
+	    put(http_prot_vsn, "HTTP/0.9"),
+	    {"HTTP/0.9", undefined, Headers}
     end.
 
 % From RFC 2616
@@ -1079,10 +1073,10 @@
 %    SP. A recipient MAY replace any linear white space with a single
 %    SP before interpreting the field value or forwarding the message
 %    downstream.
-parse_headers_1(B) when is_binary(B) ->
-    parse_headers_1(binary_to_list(B));
-parse_headers_1(String) ->
-    parse_headers_1(String, [], []).
+	parse_headers_1(B) when is_binary(B) ->
+					   parse_headers_1(binary_to_list(B));
+	parse_headers_1(String) ->
+					   parse_headers_1(String, [], []).
 
 parse_headers_1([$\n, H |T], [$\r | L], Acc) when H == 32;
 						  H == $\t ->
@@ -1205,10 +1199,10 @@
 %% scan_crlf([H|T],  L)                    -> scan_crlf(T, [H|L]);
 %% scan_crlf([], L)                        -> {no, L}.
 
-fmt_val(L) when list(L)    -> L;
-fmt_val(I) when integer(I) -> integer_to_list(I);
-fmt_val(A) when atom(A)    -> atom_to_list(A);
-fmt_val(Term)              -> io_lib:format("~p", [Term]).
+fmt_val(L) when is_list(L)    -> L;
+fmt_val(I) when is_integer(I) -> integer_to_list(I);
+fmt_val(A) when is_atom(A)    -> atom_to_list(A);
+fmt_val(Term)                 -> io_lib:format("~p", [Term]).
 
 crnl() -> "\r\n".
 
@@ -1306,7 +1300,8 @@
 do_reply(State, From, undefined, _, _, Msg) ->
     gen_server:reply(From, Msg),
     dec_pipeline_counter(State);
-do_reply(State, _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
+do_reply(#state{prev_req_id = Prev_req_id} = State,
+	 _From, StreamTo, ReqId, Resp_format, {ok, _, _, Body}) ->
     State_1 = dec_pipeline_counter(State),
     case Body of
 	[] ->
@@ -1316,7 +1311,18 @@
 	    catch StreamTo ! {ibrowse_async_response, ReqId, Body_1}
     end,
     catch StreamTo ! {ibrowse_async_response_end, ReqId},
-    State_1;
+    %% We don't want to delete the Req-id to Pid mapping straightaway
+    %% as the client may send a stream_next message just while we are
+    %% sending back this ibrowse_async_response_end message. If we
+    %% deleted this mapping straightaway, the caller will see a
+    %% {error, unknown_req_id} when it calls ibrowse:stream_next/1. To
+    %% get around this, we store the req id, and clear it after the
+    %% next request. If there are wierd combinations of stream,
+    %% stream_once and sync requests on the same connection, it will
+    %% take a while for the req_id-pid mapping to get cleared, but it
+    %% should do no harm.
+    ets:delete(ibrowse_stream, {req_id_pid, Prev_req_id}),
+    State_1#state{prev_req_id = ReqId};
 do_reply(State, _From, StreamTo, ReqId, Resp_format, Msg) ->
     State_1 = dec_pipeline_counter(State),
     Msg_1 = format_response_data(Resp_format, Msg),
@@ -1333,6 +1339,7 @@
     ReqList = queue:to_list(Reqs),
     lists:foreach(fun(#request{from=From, stream_to=StreamTo, req_id=ReqId,
 			       response_format = Resp_format}) ->
+			  ets:delete(ibrowse_stream, {req_id_pid, ReqId}),
                           do_reply(State, From, StreamTo, ReqId, Resp_format, {error, Err})
 		  end, ReqList).
 

Modified: couchdb/trunk/src/ibrowse/ibrowse_test.erl
URL: http://svn.apache.org/viewvc/couchdb/trunk/src/ibrowse/ibrowse_test.erl?rev=790953&r1=790952&r2=790953&view=diff
==============================================================================
--- couchdb/trunk/src/ibrowse/ibrowse_test.erl (original)
+++ couchdb/trunk/src/ibrowse/ibrowse_test.erl Fri Jul  3 15:56:51 2009
@@ -18,9 +18,50 @@
 	 ue_test/1,
 	 verify_chunked_streaming/0,
 	 verify_chunked_streaming/1,
-	 i_do_async_req_list/4
+	 i_do_async_req_list/4,
+	 test_stream_once/3,
+	 test_stream_once/4
 	]).
 
+test_stream_once(Url, Method, Options) ->
+    test_stream_once(Url, Method, Options, 5000).
+
+test_stream_once(Url, Method, Options, Timeout) ->
+    case ibrowse:send_req(Url, [], Method, [], [{stream_to, {self(), once}} | Options], Timeout) of
+	{ibrowse_req_id, Req_id} ->
+	    case ibrowse:stream_next(Req_id) of
+		ok ->
+		    test_stream_once(Req_id);
+		Err ->
+		    Err
+	    end;
+	Err ->
+	    Err
+    end.
+
+test_stream_once(Req_id) ->
+    receive
+	{ibrowse_async_headers, Req_id, StatCode, Headers} ->
+	    io:format("Recvd headers~n~p~n", [{ibrowse_async_headers, Req_id, StatCode, Headers}]),
+	    case ibrowse:stream_next(Req_id) of
+		ok ->
+		    test_stream_once(Req_id);
+		Err ->
+		    Err
+	    end;
+	{ibrowse_async_response, Req_id, {error, Err}} ->
+	    io:format("Recvd error: ~p~n", [Err]);
+	{ibrowse_async_response, Req_id, Body_1} ->
+	    io:format("Recvd body part: ~n~p~n", [{ibrowse_async_response, Req_id, Body_1}]),
+	    case ibrowse:stream_next(Req_id) of
+		ok ->
+		    test_stream_once(Req_id);
+		Err ->
+		    Err
+	    end;
+	{ibrowse_async_response_end, Req_id} ->
+	    ok
+    end.
 %% Use ibrowse:set_max_sessions/3 and ibrowse:set_max_pipeline_size/3 to
 %% tweak settings before running the load test. The defaults are 10 and 10.
 load_test(Url, NumWorkers, NumReqsPerWorker) when is_list(Url),
@@ -182,7 +223,8 @@
     unit_tests([]).
 
 unit_tests(Options) ->
-    {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options]),
+    Options_1 = Options ++ [{connect_timeout, 5000}],
+    {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1]),
     receive 
 	{done, Pid} ->
 	    ok;