You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by ja...@apache.org on 2012/11/14 20:13:43 UTC

git commit: update ibrowse to 4.0.1

Updated Branches:
  refs/heads/1603-update-ibrowse [created] 38e0fb2ee


update ibrowse to 4.0.1


Project: http://git-wip-us.apache.org/repos/asf/couchdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb/commit/38e0fb2e
Tree: http://git-wip-us.apache.org/repos/asf/couchdb/tree/38e0fb2e
Diff: http://git-wip-us.apache.org/repos/asf/couchdb/diff/38e0fb2e

Branch: refs/heads/1603-update-ibrowse
Commit: 38e0fb2eeb057da9271fc2e5b8919720a6c6c962
Parents: d7f2037
Author: Jan Lehnardt <ja...@apache.org>
Authored: Wed Nov 14 20:13:52 2012 +0100
Committer: Jan Lehnardt <ja...@apache.org>
Committed: Wed Nov 14 20:13:52 2012 +0100

----------------------------------------------------------------------
 src/ibrowse/ibrowse.app.in          |   14 +-
 src/ibrowse/ibrowse.erl             |  250 ++++++++++++++++++----------
 src/ibrowse/ibrowse_http_client.erl |  268 ++++++++++++++++++------------
 src/ibrowse/ibrowse_lb.erl          |   91 ++++++----
 src/ibrowse/ibrowse_lib.erl         |   74 +++++++--
 src/ibrowse/ibrowse_test.erl        |  132 ++++++++++++++-
 6 files changed, 565 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb/blob/38e0fb2e/src/ibrowse/ibrowse.app.in
----------------------------------------------------------------------
diff --git a/src/ibrowse/ibrowse.app.in b/src/ibrowse/ibrowse.app.in
index af46d8a..1d88084 100644
--- a/src/ibrowse/ibrowse.app.in
+++ b/src/ibrowse/ibrowse.app.in
@@ -1,13 +1,7 @@
 {application, ibrowse,
-        [{description, "HTTP client application"},
-         {vsn, "2.2.0"},
-         {modules, [ ibrowse, 
-		     ibrowse_http_client, 
-		     ibrowse_app, 
-		     ibrowse_sup, 
-		     ibrowse_lib,
-		     ibrowse_lb ]},
-         {registered, []},
-         {applications, [kernel,stdlib,sasl]},
+        [{description, "Erlang HTTP client application"},
+         {vsn, "4.0.1"},
+         {registered, [ibrowse_sup, ibrowse]},
+         {applications, [kernel,stdlib]},
 	 {env, []},
 	 {mod, {ibrowse_app, []}}]}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/38e0fb2e/src/ibrowse/ibrowse.erl
----------------------------------------------------------------------
diff --git a/src/ibrowse/ibrowse.erl b/src/ibrowse/ibrowse.erl
index f70f92f..80a4282 100644
--- a/src/ibrowse/ibrowse.erl
+++ b/src/ibrowse/ibrowse.erl
@@ -6,8 +6,7 @@
 %%% Created : 11 Oct 2003 by Chandrashekhar Mullaparthi <ch...@t-mobile.co.uk>
 %%%-------------------------------------------------------------------
 %% @author Chandrashekhar Mullaparthi <chandrashekhar dot mullaparthi at gmail dot com>
-%% @copyright 2005-2011 Chandrashekhar Mullaparthi
-%% @version 2.1.3
+%% @copyright 2005-2012 Chandrashekhar Mullaparthi
 %% @doc The ibrowse application implements an HTTP 1.1 client in erlang. This
 %% module implements the API of the HTTP client. There is one named
 %% process called 'ibrowse' which assists in load balancing and maintaining configuration. There is one load balancing process per unique webserver. There is
@@ -71,6 +70,7 @@
 -export([
          rescan_config/0,
          rescan_config/1,
+         add_config/1,
          get_config_value/1,
          get_config_value/2,
          spawn_worker_process/1,
@@ -97,7 +97,10 @@
          trace_off/2,
          all_trace_off/0,
          show_dest_status/0,
-         show_dest_status/2
+         show_dest_status/1,
+         show_dest_status/2,
+         get_metrics/0,
+         get_metrics/2
         ]).
 
 -ifdef(debug).
@@ -136,7 +139,12 @@ start() ->
 
 %% @doc Stop the ibrowse process. Useful when testing using the shell.
 stop() ->
-    catch gen_server:call(ibrowse, stop).
+    case catch gen_server:call(ibrowse, stop) of
+        {'EXIT',{noproc,_}} ->
+            ok;
+        Res ->
+            Res
+    end.
 
 %% @doc This is the basic function to send a HTTP request.
 %% The Status return value indicates the HTTP status code returned by the webserver
@@ -277,7 +285,8 @@ send_req(Url, Headers, Method, Body) ->
 %%          {transfer_encoding, {chunked, ChunkSize}} | 
 %%          {headers_as_is, boolean()}         |
 %%          {give_raw_headers, boolean()}      |
-%%          {preserve_chunked_encoding,boolean()}
+%%          {preserve_chunked_encoding,boolean()}     |
+%%          {workaround, head_response_with_body}
 %%
 %% stream_to() = process() | {process(), once}
 %% process() = pid() | atom()
@@ -287,7 +296,7 @@ send_req(Url, Headers, Method, Body) ->
 %% Sock_opts = [Sock_opt]
 %% Sock_opt = term()
 %% ChunkSize = integer()
-%% srtf() = boolean() | filename()
+%% srtf() = boolean() | filename() | {append, filename()}
 %% filename() = string()
 %% response_format() = list | binary
 send_req(Url, Headers, Method, Body, Options) ->
@@ -354,15 +363,16 @@ try_routing_request(_, _, _, _, _, _, _, _, _, _, _) ->
     {error, retry_later}.
 
 merge_options(Host, Port, Options) ->
-    Config_options = get_config_value({options, Host, Port}, []),
+    Config_options = get_config_value({options, Host, Port}, []) ++
+                     get_config_value({options, global}, []),
     lists:foldl(
       fun({Key, Val}, Acc) ->
-                        case lists:keysearch(Key, 1, Options) of
-                            false ->
-                                [{Key, Val} | Acc];
-                            _ ->
-                                Acc
-                        end
+              case lists:keysearch(Key, 1, Options) of
+                  false ->
+                      [{Key, Val} | Acc];
+                  _ ->
+                      Acc
+              end
       end, Options, Config_options).
 
 get_lb_pid(Url) ->
@@ -426,6 +436,8 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
             {error, req_timedout};
         {'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} ->
             {error, sel_conn_closed};
+        {'EXIT', {normal, _}} ->
+            {error, req_timedout};
         {error, connection_closed} ->
             {error, sel_conn_closed};
         {'EXIT', Reason} ->
@@ -581,74 +593,98 @@ all_trace_off() ->
 %% about workers spawned using spawn_worker_process/2 or
 %% spawn_link_worker_process/2 is not included.
 show_dest_status() ->
-    Dests = lists:filter(fun({lb_pid, {Host, Port}, _}) when is_list(Host),
-                                                             is_integer(Port) ->
-                                 true;
-                            (_) ->
-                                 false
-                         end, ets:tab2list(ibrowse_lb)),
-    All_ets = ets:all(),
     io:format("~-40.40s | ~-5.5s | ~-10.10s | ~s~n",
               ["Server:port", "ETS", "Num conns", "LB Pid"]),
     io:format("~80.80.=s~n", [""]),
-    lists:foreach(fun({lb_pid, {Host, Port}, Lb_pid}) ->
-                          case lists:dropwhile(
-                                 fun(Tid) ->
-                                         ets:info(Tid, owner) /= Lb_pid
-                                 end, All_ets) of
-                              [] ->
-                                  io:format("~40.40s | ~-5.5s | ~-5.5s | ~s~n",
-                                            [Host ++ ":" ++ integer_to_list(Port),
-                                             "",
-                                             "",
-                                             io_lib:format("~p", [Lb_pid])]
-                                           );
-                              [Tid | _] ->
-                                  catch (
-                                    begin
-                                        Size = ets:info(Tid, size),
-                                        io:format("~40.40s | ~-5.5s | ~-5.5s | ~s~n",
-                                                  [Host ++ ":" ++ integer_to_list(Port),
-                                                   io_lib:format("~p", [Tid]),
-                                                   integer_to_list(Size),
-                                                   io_lib:format("~p", [Lb_pid])]
-                                                 )
-                                    end
-                                   )
-                                  end
-                  end, Dests).
-                                          
+    Metrics = get_metrics(),
+    lists:foreach(
+      fun({Host, Port, Lb_pid, Tid, Size}) ->
+              io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n",
+                        [Host ++ ":" ++ integer_to_list(Port),
+                         integer_to_list(Tid),
+                         integer_to_list(Size), 
+                         Lb_pid])
+      end, Metrics).
+
+show_dest_status(Url) ->                                          
+    #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
+    show_dest_status(Host, Port).
+
 %% @doc Shows some internal information about load balancing to a
 %% specified Host:Port. Info about workers spawned using
 %% spawn_worker_process/2 or spawn_link_worker_process/2 is not
 %% included.
 show_dest_status(Host, Port) ->
+    case get_metrics(Host, Port) of
+        {Lb_pid, MsgQueueSize, Tid, Size,
+         {{First_p_sz, First_speculative_sz},
+          {Last_p_sz, Last_speculative_sz}}} ->
+            io:format("Load Balancer Pid     : ~p~n"
+                      "LB process msg q size : ~p~n"
+                      "LB ETS table id       : ~p~n"
+                      "Num Connections       : ~p~n"
+                      "Smallest pipeline     : ~p:~p~n"
+                      "Largest pipeline      : ~p:~p~n",
+                      [Lb_pid, MsgQueueSize, Tid, Size, 
+                       First_p_sz, First_speculative_sz,
+                       Last_p_sz, Last_speculative_sz]);
+        _Err ->
+            io:format("Metrics not available~n", [])
+    end.
+
+get_metrics() ->
+    Dests = lists:filter(fun({lb_pid, {Host, Port}, _}) when is_list(Host),
+                                                             is_integer(Port) ->
+                                 true;
+                            (_) ->
+                                 false
+                         end, ets:tab2list(ibrowse_lb)),
+    All_ets = ets:all(),
+    lists:map(fun({lb_pid, {Host, Port}, Lb_pid}) ->
+                  case lists:dropwhile(
+                         fun(Tid) ->
+                                 ets:info(Tid, owner) /= Lb_pid
+                         end, All_ets) of
+                      [] ->
+                          {Host, Port, Lb_pid, unknown, 0};
+                      [Tid | _] ->
+                          Size = case catch (ets:info(Tid, size)) of
+                                     N when is_integer(N) -> N;
+                                     _ -> 0
+                                 end,
+                          {Host, Port, Lb_pid, Tid, Size}
+                  end
+              end, Dests).
+
+get_metrics(Host, Port) ->
     case ets:lookup(ibrowse_lb, {Host, Port}) of
         [] ->
             no_active_processes;
         [#lb_pid{pid = Lb_pid}] ->
-            io:format("Load Balancer Pid     : ~p~n", [Lb_pid]),
-            io:format("LB process msg q size : ~p~n", [(catch process_info(Lb_pid, message_queue_len))]),
+            MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)),
+            %% {Lb_pid, MsgQueueSize,
             case lists:dropwhile(
                    fun(Tid) ->
                            ets:info(Tid, owner) /= Lb_pid
                    end, ets:all()) of
                 [] ->
-                    io:format("Couldn't locate ETS table for ~p~n", [Lb_pid]);
+                    {Lb_pid, MsgQueueSize, unknown, 0, unknown};
                 [Tid | _] ->
-                    First = ets:first(Tid),
-                    Last = ets:last(Tid),
-                    Size = ets:info(Tid, size),
-                    io:format("LB ETS table id       : ~p~n", [Tid]),
-                    io:format("Num Connections       : ~p~n", [Size]),
-                    case Size of
-                        0 ->
-                            ok;
-                        _ ->
-                            {First_p_sz, _} = First,
-                            {Last_p_sz, _} = Last,
-                            io:format("Smallest pipeline     : ~1000.p~n", [First_p_sz]),
-                            io:format("Largest pipeline      : ~1000.p~n", [Last_p_sz])
+                    try
+                        Size = ets:info(Tid, size),
+                        case Size of
+                            0 ->
+                                ok;
+                            _ ->
+                                First = ets:first(Tid),
+                                Last = ets:last(Tid),
+                                [{_, First_p_sz, First_speculative_sz}] = ets:lookup(Tid, First),
+                                [{_, Last_p_sz, Last_speculative_sz}] = ets:lookup(Tid, Last),
+                                {Lb_pid, MsgQueueSize, Tid, Size,
+                                 {{First_p_sz, First_speculative_sz}, {Last_p_sz, Last_speculative_sz}}}
+                        end
+                    catch _:_ ->
+                            not_available
                     end
             end
     end.
@@ -663,9 +699,15 @@ rescan_config() ->
 %% Clear current configuration for ibrowse and load from the specified
 %% file. Current configuration is cleared only if the specified
 %% file is readable using file:consult/1
+rescan_config([{_,_}|_]=Terms) ->
+    gen_server:call(?MODULE, {rescan_config_terms, Terms});
 rescan_config(File) when is_list(File) ->
     gen_server:call(?MODULE, {rescan_config, File}).
 
+%% @doc Add additional configuration elements at runtime.
+add_config([{_,_}|_]=Terms) ->
+    gen_server:call(?MODULE, {add_config_terms, Terms}).
+
 %%====================================================================
 %% Server functions
 %%====================================================================
@@ -701,44 +743,60 @@ import_config() ->
 import_config(Filename) ->
     case file:consult(Filename) of
         {ok, Terms} ->
-            ets:delete_all_objects(ibrowse_conf),
-            Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options}) 
-                     when is_list(Host), is_integer(Port),
-                          is_integer(MaxSess), MaxSess > 0,
-                          is_integer(MaxPipe), MaxPipe > 0, is_list(Options) ->
-                          I = [{{max_sessions, Host, Port}, MaxSess},
-                               {{max_pipeline_size, Host, Port}, MaxPipe},
-                               {{options, Host, Port}, Options}],
-                          lists:foreach(
-                            fun({X, Y}) ->
-                                    ets:insert(ibrowse_conf,
-                                               #ibrowse_conf{key = X, 
-                                                             value = Y})
-                            end, I);
-                     ({K, V}) ->
-                          ets:insert(ibrowse_conf,
-                                     #ibrowse_conf{key = K,
-                                                   value = V});
-                     (X) ->
-                          io:format("Skipping unrecognised term: ~p~n", [X])
-                  end,
-            lists:foreach(Fun, Terms);
+            apply_config(Terms);
         _Err ->
             ok
     end.
 
+apply_config(Terms) ->
+    ets:delete_all_objects(ibrowse_conf),
+    insert_config(Terms).
+
+insert_config(Terms) ->
+    Fun = fun({dest, Host, Port, MaxSess, MaxPipe, Options}) 
+             when is_list(Host), is_integer(Port),
+                  is_integer(MaxSess), MaxSess > 0,
+                  is_integer(MaxPipe), MaxPipe > 0, is_list(Options) ->
+                  I = [{{max_sessions, Host, Port}, MaxSess},
+                       {{max_pipeline_size, Host, Port}, MaxPipe},
+                       {{options, Host, Port}, Options}],
+                  lists:foreach(
+                    fun({X, Y}) ->
+                            ets:insert(ibrowse_conf,
+                                       #ibrowse_conf{key = X, 
+                                                     value = Y})
+                    end, I);
+             ({K, V}) ->
+                  ets:insert(ibrowse_conf,
+                             #ibrowse_conf{key = K,
+                                           value = V});
+             (X) ->
+                  io:format("Skipping unrecognised term: ~p~n", [X])
+          end,
+    lists:foreach(Fun, Terms).
+
 %% @doc Internal export
 get_config_value(Key) ->
-    [#ibrowse_conf{value = V}] = ets:lookup(ibrowse_conf, Key),
-    V.
+    try
+        [#ibrowse_conf{value = V}] = ets:lookup(ibrowse_conf, Key),
+        V
+    catch
+        error:badarg ->
+            throw({error, ibrowse_not_running})
+    end.
 
 %% @doc Internal export
 get_config_value(Key, DefVal) ->
-    case ets:lookup(ibrowse_conf, Key) of
-        [] ->
-            DefVal;
-        [#ibrowse_conf{value = V}] ->
-            V
+    try
+        case ets:lookup(ibrowse_conf, Key) of
+            [] ->
+                DefVal;
+            [#ibrowse_conf{value = V}] ->
+                V
+        end
+    catch
+        error:badarg ->
+            throw({error, ibrowse_not_running})
     end.
 
 set_config_value(Key, Val) ->
@@ -777,6 +835,14 @@ handle_call({rescan_config, File}, _From, State) ->
     Ret = (catch import_config(File)),
     {reply, Ret, State};
 
+handle_call({rescan_config_terms, Terms}, _From, State) ->
+    Ret = (catch apply_config(Terms)),
+    {reply, Ret, State};
+
+handle_call({add_config_terms, Terms}, _From, State) ->
+    Ret = (catch insert_config(Terms)),
+    {reply, Ret, State};
+
 handle_call(Request, _From, State) ->
     Reply = {unknown_request, Request},
     {reply, Reply, State}.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/38e0fb2e/src/ibrowse/ibrowse_http_client.erl
----------------------------------------------------------------------
diff --git a/src/ibrowse/ibrowse_http_client.erl b/src/ibrowse/ibrowse_http_client.erl
index 00e8ed3..c01385a 100644
--- a/src/ibrowse/ibrowse_http_client.erl
+++ b/src/ibrowse/ibrowse_http_client.erl
@@ -47,7 +47,7 @@
                 reply_buffer = <<>>, rep_buf_size=0, streamed_size = 0,
                 recvd_headers=[],
                 status_line, raw_headers,
-                is_closing, send_timer, content_length,
+                is_closing, content_length,
                 deleted_crlf = false, transfer_encoding,
                 chunk_size, chunk_size_buffer = <<>>,
                 recvd_chunk_size, interim_reply_sent = false,
@@ -61,7 +61,7 @@
                   stream_chunk_size,
                   save_response_to_file = false,
                   tmp_file_name, tmp_file_fd, preserve_chunked_encoding,
-                  response_format}).
+                  response_format, timer_ref}).
 
 -import(ibrowse_lib, [
                       get_value/2,
@@ -118,7 +118,7 @@ init({Lb_Tid, #url{host = Host, port = Port}, {SSLOptions, Is_ssl}}) ->
                    lb_ets_tid = Lb_Tid},
     put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
     put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
-    {ok, State};
+    {ok, set_inac_timer(State)};
 init(Url) when is_list(Url) ->
     case catch ibrowse_lib:parse_url(Url) of
         #url{protocol = Protocol} = Url_rec ->
@@ -131,7 +131,7 @@ init({Host, Port}) ->
                    port = Port},
     put(ibrowse_trace_token, [Host, $:, integer_to_list(Port)]),
     put(my_trace_flag, ibrowse_lib:get_trace_status(Host, Port)),
-    {ok, State}.
+    {ok, set_inac_timer(State)}.
 
 %%--------------------------------------------------------------------
 %% Function: handle_call/3
@@ -179,7 +179,6 @@ handle_cast(_Msg, State) ->
 %%          {stop, Reason, State}            (terminate/2 is called)
 %%--------------------------------------------------------------------
 handle_info({tcp, _Sock, Data}, #state{status = Status} = State) ->
-%%    io:format("Recvd data: ~p~n", [Data]),
     do_trace("Data recvd in state: ~p. Size: ~p. ~p~n~n", [Status, size(Data), Data]),
     handle_sock_data(Data, State);
 handle_info({ssl, _Sock, Data}, State) ->
@@ -187,7 +186,6 @@ handle_info({ssl, _Sock, Data}, State) ->
 
 handle_info({stream_next, Req_id}, #state{socket = Socket,
                                           cur_req = #request{req_id = Req_id}} = State) ->
-    %% io:format("Client process set {active, once}~n", []),
     do_setopts(Socket, [{active, once}], State),
     {noreply, set_inac_timer(State)};
 
@@ -198,8 +196,6 @@ handle_info({stream_next, _Req_id}, State) ->
                      _ ->
                          undefined
                  end,
-%%     io:format("Ignoring stream_next as ~1000.p is not cur req (~1000.p)~n",
-%%               [_Req_id, _Cur_req_id]),
     {noreply, State};
 
 handle_info({stream_close, _Req_id}, State) ->
@@ -234,7 +230,7 @@ handle_info({req_timedout, From}, State) ->
             {noreply, State};
         true ->
             shutting_down(State),
-            do_error_reply(State, req_timedout),
+%%            do_error_reply(State, req_timedout),
             {stop, normal, State}
     end;
 
@@ -357,7 +353,8 @@ accumulate_response(Data,
                                          tmp_file_fd = undefined} = CurReq,
                       http_status_code=[$2 | _]}=State) when Srtf /= false ->
     TmpFilename = make_tmp_filename(Srtf),
-    case file:open(TmpFilename, [write, delayed_write, raw]) of
+    Mode = file_mode(Srtf),
+    case file:open(TmpFilename, [Mode, delayed_write, raw]) of
         {ok, Fd} ->
             accumulate_response(Data, State#state{
                                         cur_req = CurReq#request{
@@ -434,8 +431,13 @@ make_tmp_filename(true) ->
                    integer_to_list(B) ++
                    integer_to_list(C)]);
 make_tmp_filename(File) when is_list(File) ->
+    File;
+make_tmp_filename({append, File}) when is_list(File) ->
     File.
 
+file_mode({append, _File}) -> append;
+file_mode(_Srtf) -> write.
+
 
 %%--------------------------------------------------------------------
 %% Handles the case when the server closes the socket
@@ -560,9 +562,13 @@ do_send_body(Body, State, _TE) ->
 
 do_send_body1(Source, Resp, State, TE) ->
     case Resp of
+                {ok, Data} when Data == []; Data == <<>> ->
+                        do_send_body({Source}, State, TE);
         {ok, Data} ->
             do_send(maybe_chunked_encode(Data, TE), State),
             do_send_body({Source}, State, TE);
+                {ok, Data, New_source_state} when Data == []; Data == <<>> ->
+                        do_send_body({Source, New_source_state}, State, TE);
         {ok, Data, New_source_state} ->
             do_send(maybe_chunked_encode(Data, TE), State),
             do_send_body({Source, New_source_state}, State, TE);
@@ -658,10 +664,17 @@ send_req_1(From,
                   proxy_tunnel_setup = false,
                   use_proxy = true,
                   is_ssl    = true} = State) ->
+    Ref = case Timeout of
+              infinity ->
+                  undefined;
+              _ ->
+                  erlang:send_after(Timeout, self(), {req_timedout, From})
+          end,
     NewReq = #request{
       method                    = connect,
       preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false),
-      options                   = Options
+      options                   = Options,
+      timer_ref                 = Ref
      },
     State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
     Pxy_auth_headers = maybe_modify_headers(Url, Method, Options, [], State_1),
@@ -677,17 +690,11 @@ send_req_1(From,
                 ok ->
                     trace_request_body(Body_1),
                     active_once(State_1),
-                    Ref = case Timeout of
-                              infinity ->
-                                  undefined;
-                              _ ->
-                                  erlang:send_after(Timeout, self(), {req_timedout, From})
-                          end,
-                    State_2 = State_1#state{status     = get_header,
-                                            cur_req    = NewReq,
-                                            send_timer = Ref,
-                                            proxy_tunnel_setup = in_progress,
-                                            tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]},
+                    State_1_1 = inc_pipeline_counter(State_1),
+                    State_2 = State_1_1#state{status     = get_header,
+                                              cur_req    = NewReq,
+                                              proxy_tunnel_setup = in_progress,
+                                              tunnel_setup_queue = [{From, Url, Headers, Method, Body, Options, Timeout}]},
                     State_3 = set_inac_timer(State_2),
                     {noreply, State_3};
                 Err ->
@@ -738,6 +745,12 @@ send_req_1(From,
                 exit({invalid_option, {stream_to, Stream_to_inv}})
         end,
     SaveResponseToFile = get_value(save_response_to_file, Options, false),
+    Ref = case Timeout of
+              infinity ->
+                  undefined;
+              _ ->
+                  erlang:send_after(Timeout, self(), {req_timedout, From})
+          end,
     NewReq = #request{url                    = Url,
                       method                 = Method,
                       stream_to              = StreamTo,
@@ -749,7 +762,8 @@ send_req_1(From,
                       stream_chunk_size      = get_stream_chunk_size(Options),
                       response_format        = Resp_format,
                       from                   = From,
-                      preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false)
+                      preserve_chunked_encoding = get_value(preserve_chunked_encoding, Options, false),
+                      timer_ref              = Ref
                      },
     State_1 = State#state{reqs=queue:in(NewReq, State#state.reqs)},
     Headers_1 = maybe_modify_headers(Url, Method, Options, Headers, State_1),
@@ -767,19 +781,12 @@ send_req_1(From,
                     trace_request_body(Body_1),
                     State_2 = inc_pipeline_counter(State_1),
                     active_once(State_2),
-                    Ref = case Timeout of
-                              infinity ->
-                                  undefined;
-                              _ ->
-                                  erlang:send_after(Timeout, self(), {req_timedout, From})
-                          end,
                     State_3 = case Status of
                                   idle ->
                                       State_2#state{status     = get_header,
-                                                    cur_req    = NewReq,
-                                                    send_timer = Ref};
+                                                    cur_req    = NewReq};
                                   _ ->
-                                      State_2#state{send_timer = Ref}
+                                      State_2
                               end,
                     case StreamTo of
                         undefined ->
@@ -987,13 +994,17 @@ chunk_request_body(Body, _ChunkSize, Acc) when is_list(Body) ->
     lists:reverse(["\r\n", LastChunk, Chunk | Acc]).
 
 
-parse_response(_Data, #state{cur_req = undefined}=State) ->
+parse_response(<<>>, #state{cur_req = undefined}=State) ->
     State#state{status = idle};
+parse_response(Data, #state{cur_req = undefined}) ->
+    do_trace("Data left to process when no pending request. ~1000.p~n", [Data]),
+    {error, data_in_status_idle};
+
 parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
                             cur_req = CurReq} = State) ->
     #request{from=From, stream_to=StreamTo, req_id=ReqId,
              method=Method, response_format = Resp_format,
-             options = Options
+             options = Options, timer_ref = T_ref
             } = CurReq,
     MaxHeaderSize = ibrowse:get_config_value(max_headers_size, infinity),
     case scan_header(Acc, Data) of
@@ -1005,47 +1016,55 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
             LCHeaders = [{to_lower(X), Y} || {X,Y} <- Headers_1],
             ConnClose = to_lower(get_value("connection", LCHeaders, "false")),
             IsClosing = is_connection_closing(HttpVsn, ConnClose),
-            case IsClosing of
-                true ->
-                    shutting_down(State);
-                false ->
-                    ok
-            end,
+            State_0 = case IsClosing of
+                          true ->
+                              shutting_down(State),
+                              State#state{is_closing = IsClosing};
+                          false ->
+                              State
+                      end,
             Give_raw_headers = get_value(give_raw_headers, Options, false),
             State_1 = case Give_raw_headers of
                           true ->
-                              State#state{recvd_headers=Headers_1, status=get_body,
-                                          reply_buffer = <<>>,
-                                          status_line = Status_line,
-                                          raw_headers = Raw_headers,
-                                          http_status_code=StatCode, is_closing=IsClosing};
+                              State_0#state{recvd_headers=Headers_1, status=get_body,
+                                            reply_buffer = <<>>,
+                                            status_line = Status_line,
+                                            raw_headers = Raw_headers,
+                                            http_status_code=StatCode};
                           false ->
-                              State#state{recvd_headers=Headers_1, status=get_body,
-                                          reply_buffer = <<>>,
-                                          http_status_code=StatCode, is_closing=IsClosing}
+                              State_0#state{recvd_headers=Headers_1, status=get_body,
+                                            reply_buffer = <<>>,
+                                            http_status_code=StatCode}
                       end,
             put(conn_close, ConnClose),
             TransferEncoding = to_lower(get_value("transfer-encoding", LCHeaders, "false")),
+            Head_response_with_body = lists:member({workaround, head_response_with_body}, Options),
             case get_value("content-length", LCHeaders, undefined) of
                 _ when Method == connect,
                        hd(StatCode) == $2 ->
-                    cancel_timer(State#state.send_timer),
                     {_, Reqs_1} = queue:out(Reqs),
-                    upgrade_to_ssl(set_cur_request(State#state{reqs = Reqs_1,
-                                                               recvd_headers = [],
-                                                               status = idle
-                                                              }));
+                    cancel_timer(T_ref),
+                    upgrade_to_ssl(set_cur_request(State_0#state{reqs = Reqs_1,
+                                                                 recvd_headers = [],
+                                                                 status = idle
+                                                                }));
                 _ when Method == connect ->
                     {_, Reqs_1} = queue:out(Reqs),
                     do_error_reply(State#state{reqs = Reqs_1},
                                    {error, proxy_tunnel_failed}),
                     {error, proxy_tunnel_failed};
-                _ when Method == head ->
+                _ when Method =:= head,
+                       Head_response_with_body =:= false ->
+                    %% This (HEAD response with body) is not supposed
+                    %% to happen, but it does. An Apache server was
+                    %% observed to send an "empty" body, but in a
+                    %% Chunked-Transfer-Encoding way, which meant
+                    %% there was still a body.  Issue #67 on Github
                     {_, Reqs_1} = queue:out(Reqs),
                     send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
                     State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format,
                                          {ok, StatCode, Headers_1, []}),
-                    cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}),
+                    cancel_timer(T_ref, {eat_message, {req_timedout, From}}),
                     State_2 = reset_state(State_1_1),
                     State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
                     parse_response(Data_1, State_3);
@@ -1065,7 +1084,7 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
                     send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
                     State_1_1 = do_reply(State_1, From, StreamTo, ReqId, Resp_format,
                                          {ok, StatCode, Headers_1, []}),
-                    cancel_timer(State_1_1#state.send_timer, {eat_message, {req_timedout, From}}),
+                    cancel_timer(T_ref, {eat_message, {req_timedout, From}}),
                     State_2 = reset_state(State_1_1),
                     State_3 = set_cur_request(State_2#state{reqs = Reqs_1}),
                     parse_response(Data_1, State_3);
@@ -1084,7 +1103,7 @@ parse_response(Data, #state{reply_buffer = Acc, reqs = Reqs,
                             State_2
                     end;
                 undefined when HttpVsn =:= "HTTP/1.0";
-                ConnClose =:= "close" ->
+                               ConnClose =:= "close" ->
                     send_async_headers(ReqId, StreamTo, Give_raw_headers, State_1),
                     State_1#state{reply_buffer = Data_1};
                 undefined ->
@@ -1291,12 +1310,12 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
                          save_response_to_file = SaveResponseToFile,
                          tmp_file_name = TmpFilename,
                          tmp_file_fd = Fd,
-                         options       = Options
+                         options       = Options,
+                         timer_ref     = ReqTimer
                         },
                 #state{http_status_code = SCode,
                        status_line   = Status_line,
                        raw_headers   = Raw_headers,
-                       send_timer    = ReqTimer,
                        reply_buffer  = RepBuf,
                        recvd_headers = RespHeaders}=State) when SaveResponseToFile /= false ->
     Body = RepBuf,
@@ -1324,13 +1343,13 @@ handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
     set_cur_request(State_1);
 handle_response(#request{from=From, stream_to=StreamTo, req_id=ReqId,
                          response_format = Resp_format,
-                         options = Options},
+                         options = Options, timer_ref = ReqTimer},
                 #state{http_status_code = SCode,
                        status_line      = Status_line,
                        raw_headers      = Raw_headers,
                        recvd_headers    = Resp_headers,
-                       reply_buffer     = RepBuf,
-                       send_timer       = ReqTimer} = State) ->
+                       reply_buffer     = RepBuf
+                      } = State) ->
     Body = RepBuf,
     {Resp_headers_1, Raw_headers_1} = maybe_add_custom_headers(Resp_headers, Raw_headers, Options),
     Reply = case get_value(give_raw_headers, Options, false) of
@@ -1360,10 +1379,10 @@ reset_state(State) ->
                }.
 
 set_cur_request(#state{reqs = Reqs, socket = Socket} = State) ->
-    case queue:to_list(Reqs) of
-        [] ->
+    case queue:peek(Reqs) of
+        empty ->
             State#state{cur_req = undefined};
-        [#request{caller_controls_socket = Ccs} = NextReq | _] ->
+        {value, #request{caller_controls_socket = Ccs} = NextReq} ->
             case Ccs of
                 true ->
                     do_setopts(Socket, [{active, once}], State);
@@ -1410,6 +1429,11 @@ parse_headers_1([$\n, H |T], [$\r | L], Acc) when H =:= 32;
     parse_headers_1(lists:dropwhile(fun(X) ->
                                             is_whitespace(X)
                                     end, T), [32 | L], Acc);
+parse_headers_1([$\n, H |T], L, Acc) when H =:= 32;
+                                          H =:= $\t ->
+    parse_headers_1(lists:dropwhile(fun(X) ->
+                                            is_whitespace(X)
+                                    end, T), [32 | L], Acc);
 parse_headers_1([$\n|T], [$\r | L], Acc) ->
     case parse_header(lists:reverse(L)) of
         invalid ->
@@ -1417,6 +1441,13 @@ parse_headers_1([$\n|T], [$\r | L], Acc) ->
         NewHeader ->
             parse_headers_1(T, [], [NewHeader | Acc])
     end;
+parse_headers_1([$\n|T], L, Acc) ->
+    case parse_header(lists:reverse(L)) of
+        invalid ->
+            parse_headers_1(T, [], Acc);
+        NewHeader ->
+            parse_headers_1(T, [], [NewHeader | Acc])
+    end;
 parse_headers_1([H|T],  L, Acc) ->
     parse_headers_1(T, [H|L], Acc);
 parse_headers_1([], [], Acc) ->
@@ -1458,10 +1489,13 @@ parse_header([], _) ->
     invalid.
 
 scan_header(Bin) ->
-    case get_crlf_crlf_pos(Bin) of
+    case get_crlf_crlf_pos(Bin, 0) of
         {yes, Pos} ->
             {Headers, <<_:4/binary, Body/binary>>} = split_binary(Bin, Pos),
             {yes, Headers, Body};
+        {yes_dodgy, Pos} ->
+            {Headers, <<_:2/binary, Body/binary>>} = split_binary(Bin, Pos),
+            {yes, Headers, Body};
         no ->
             {no, Bin}
     end.
@@ -1474,29 +1508,26 @@ scan_header(Bin1, Bin2) ->
     Bin1_already_scanned_size = size(Bin1) - 4,
     <<Headers_prefix:Bin1_already_scanned_size/binary, Rest/binary>> = Bin1,
     Bin_to_scan = <<Rest/binary, Bin2/binary>>,
-    case get_crlf_crlf_pos(Bin_to_scan) of
+    case get_crlf_crlf_pos(Bin_to_scan, 0) of
         {yes, Pos} ->
             {Headers_suffix, <<_:4/binary, Body/binary>>} = split_binary(Bin_to_scan, Pos),
             {yes, <<Headers_prefix/binary, Headers_suffix/binary>>, Body};
+        {yes_dodgy, Pos} ->
+            {Headers_suffix, <<_:2/binary, Body/binary>>} = split_binary(Bin_to_scan, Pos),
+            {yes, <<Headers_prefix/binary, Headers_suffix/binary>>, Body};
         no ->
             {no, <<Bin1/binary, Bin2/binary>>}
     end.
 
-get_crlf_crlf_pos(Data) ->
-    binary_bif_match(Data, <<$\r, $\n, $\r, $\n>>).
-
-binary_bif_match(Data, Binary) ->
-    case binary:match(Data, Binary) of
-    {Pos, _Len} ->
-        {yes, Pos};
-    _ -> no
-    end.
-
+get_crlf_crlf_pos(<<$\r, $\n, $\r, $\n, _/binary>>, Pos) -> {yes, Pos};
+get_crlf_crlf_pos(<<$\n, $\n, _/binary>>, Pos)           -> {yes_dodgy, Pos};
+get_crlf_crlf_pos(<<_, Rest/binary>>, Pos)               -> get_crlf_crlf_pos(Rest, Pos + 1);
+get_crlf_crlf_pos(<<>>, _)                               -> no.
 
 scan_crlf(Bin) ->
     case get_crlf_pos(Bin) of
-        {yes, Pos} ->
-            {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin, Pos),
+        {yes, Offset, Pos} ->
+            {Prefix, <<_:Offset/binary, Suffix/binary>>} = split_binary(Bin, Pos),
             {yes, Prefix, Suffix};
         no ->
             {no, Bin}
@@ -1513,16 +1544,20 @@ scan_crlf_1(Bin1_head_size, Bin1, Bin2) ->
     <<Bin1_head:Bin1_head_size/binary, Bin1_tail/binary>> = Bin1,
     Bin3 = <<Bin1_tail/binary, Bin2/binary>>,
     case get_crlf_pos(Bin3) of
-        {yes, Pos} ->
-            {Prefix, <<_, _, Suffix/binary>>} = split_binary(Bin3, Pos),
+        {yes, Offset, Pos} ->
+            {Prefix, <<_:Offset/binary, Suffix/binary>>} = split_binary(Bin3, Pos),
             {yes, list_to_binary([Bin1_head, Prefix]), Suffix};
         no ->
             {no, list_to_binary([Bin1, Bin2])}
     end.
 
-get_crlf_pos(Data) ->
-    binary_bif_match(Data, <<$\r, $\n>>).
+get_crlf_pos(Bin) ->
+    get_crlf_pos(Bin, 0).
 
+get_crlf_pos(<<$\r, $\n, _/binary>>, Pos) -> {yes, 2, Pos};
+get_crlf_pos(<<$\n, _/binary>>, Pos) ->      {yes, 1, Pos};
+get_crlf_pos(<<_, Rest/binary>>, Pos)     -> get_crlf_pos(Rest, Pos + 1);
+get_crlf_pos(<<>>, _)                     -> no.
 
 fmt_val(L) when is_list(L)    -> L;
 fmt_val(I) when is_integer(I) -> integer_to_list(I);
@@ -1531,21 +1566,36 @@ fmt_val(Term)                 -> io_lib:format("~p", [Term]).
 
 crnl() -> "\r\n".
 
-method(get)       -> "GET";
-method(post)      -> "POST";
-method(head)      -> "HEAD";
-method(options)   -> "OPTIONS";
-method(put)       -> "PUT";
-method(delete)    -> "DELETE";
-method(trace)     -> "TRACE";
-method(mkcol)     -> "MKCOL";
-method(propfind)  -> "PROPFIND";
-method(proppatch) -> "PROPPATCH";
-method(lock)      -> "LOCK";
-method(unlock)    -> "UNLOCK";
-method(move)      -> "MOVE";
-method(copy)      -> "COPY";
-method(connect)   -> "CONNECT".
+method(connect)     -> "CONNECT";
+method(delete)      -> "DELETE";
+method(get)         -> "GET";
+method(head)        -> "HEAD";
+method(options)     -> "OPTIONS";
+method(post)        -> "POST";
+method(put)         -> "PUT";
+method(trace)       -> "TRACE";
+%% webdav
+method(copy)        -> "COPY";
+method(lock)        -> "LOCK";
+method(mkcol)       -> "MKCOL";
+method(move)        -> "MOVE";
+method(propfind)    -> "PROPFIND";
+method(proppatch)   -> "PROPPATCH";
+method(search)      -> "SEARCH";
+method(unlock)      -> "UNLOCK";
+%% subversion %%
+method(report)      -> "REPORT";
+method(mkactivity)  -> "MKACTIVITY";
+method(checkout)    -> "CHECKOUT";
+method(merge)       -> "MERGE";
+%% upnp
+method(msearch)     -> "MSEARCH";
+method(notify)      -> "NOTIFY";
+method(subscribe)   -> "SUBSCRIBE";
+method(unsubscribe) -> "UNSUBSCRIBE";
+%% rfc-5789
+method(patch)       -> "PATCH";
+method(purge)       -> "PURGE".
 
 %% From RFC 2616
 %%
@@ -1768,22 +1818,34 @@ to_lower([], Acc) ->
 shutting_down(#state{lb_ets_tid = undefined}) ->
     ok;
 shutting_down(#state{lb_ets_tid = Tid,
-                     cur_pipeline_size = Sz}) ->
-    catch ets:delete(Tid, {Sz, self()}).
+                     cur_pipeline_size = _Sz}) ->
+    catch ets:delete(Tid, self()).
 
 inc_pipeline_counter(#state{is_closing = true} = State) ->
     State;
-inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz} = State) ->
+inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
+    State;
+inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
+                           lb_ets_tid = Tid} = State) ->
+    update_counter(Tid, self(), {2,1,99999,9999}),
     State#state{cur_pipeline_size = Pipe_sz + 1}.
 
+update_counter(Tid, Key, Args) ->
+    ets:update_counter(Tid, Key, Args).
+
 dec_pipeline_counter(#state{is_closing = true} = State) ->
     State;
 dec_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
     State;
 dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
                             lb_ets_tid = Tid} = State) ->
-    ets:delete(Tid, {Pipe_sz, self()}),
-    ets:insert(Tid, {{Pipe_sz - 1, self()}, []}),
+    try
+        update_counter(Tid, self(), {2,-1,0,0}),
+        update_counter(Tid, self(), {3,-1,0,0})
+    catch
+        _:_ ->
+            ok
+    end,
     State#state{cur_pipeline_size = Pipe_sz - 1}.
 
 flatten([H | _] = L) when is_integer(H) ->

http://git-wip-us.apache.org/repos/asf/couchdb/blob/38e0fb2e/src/ibrowse/ibrowse_lb.erl
----------------------------------------------------------------------
diff --git a/src/ibrowse/ibrowse_lb.erl b/src/ibrowse/ibrowse_lb.erl
index 0e001d4..d98cf32 100644
--- a/src/ibrowse/ibrowse_lb.erl
+++ b/src/ibrowse/ibrowse_lb.erl
@@ -36,7 +36,9 @@
 		port,
 		max_sessions,
 		max_pipeline_size,
-		num_cur_sessions = 0}).
+		num_cur_sessions = 0,
+                proc_state
+               }).
 
 -include("ibrowse.hrl").
 
@@ -104,14 +106,21 @@ stop(Lb_pid) ->
 %%          {stop, Reason, Reply, State}   | (terminate/2 is called)
 %%          {stop, Reason, State}            (terminate/2 is called)
 %%--------------------------------------------------------------------
-% handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _}, _From,
-% 	    #state{max_sessions = Max_sess,
-% 		   ets_tid = Tid,
-% 		   max_pipeline_size = Max_pipe_sz,
-% 		   num_cur_sessions = Num} = State) 
-%     when Num >= Max ->
-%     Reply = find_best_connection(Tid),
-%     {reply, sorry_dude_reuse, State};
+
+handle_call(stop, _From, #state{ets_tid = undefined} = State) ->
+    gen_server:reply(_From, ok),
+    {stop, normal, State};
+
+handle_call(stop, _From, #state{ets_tid = Tid} = State) ->
+    ets:foldl(fun({Pid, _, _}, Acc) ->
+                      ibrowse_http_client:stop(Pid),
+                      Acc
+              end, [], Tid),
+    gen_server:reply(_From, ok),
+    {stop, normal, State};
+
+handle_call(_, _From, #state{proc_state = shutting_down} = State) ->
+    {reply, {error, shutting_down}, State};
 
 %% Update max_sessions in #state with supplied value
 handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _}, _From,
@@ -119,27 +128,18 @@ handle_call({spawn_connection, _Url, Max_sess, Max_pipe, _}, _From,
     when Num >= Max_sess ->
     State_1 = maybe_create_ets(State),
     Reply = find_best_connection(State_1#state.ets_tid, Max_pipe),
-    {reply, Reply, State_1#state{max_sessions = Max_sess}};
+    {reply, Reply, State_1#state{max_sessions = Max_sess,
+                                 max_pipeline_size = Max_pipe}};
 
-handle_call({spawn_connection, Url, _Max_sess, _Max_pipe, SSL_options}, _From,
+handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options}, _From,
 	    #state{num_cur_sessions = Cur} = State) ->
     State_1 = maybe_create_ets(State),
     Tid = State_1#state.ets_tid,
     {ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}),
-    ets:insert(Tid, {{1, Pid}, []}),
-    {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1}};
-
-handle_call(stop, _From, #state{ets_tid = undefined} = State) ->
-    gen_server:reply(_From, ok),
-    {stop, normal, State};
-
-handle_call(stop, _From, #state{ets_tid = Tid} = State) ->
-    ets:foldl(fun({{_, Pid}, _}, Acc) ->
-                      ibrowse_http_client:stop(Pid),
-                      Acc
-              end, [], Tid),
-    gen_server:reply(_From, ok),
-    {stop, normal, State};
+    ets:insert(Tid, {Pid, 0, 0}),
+    {reply, {ok, Pid}, State_1#state{num_cur_sessions = Cur + 1,
+                                     max_sessions = Max_sess,
+                                     max_pipeline_size = Max_pipe}};
 
 handle_call(Request, _From, State) ->
     Reply = {unknown_request, Request},
@@ -173,14 +173,13 @@ handle_info({'EXIT', Pid, _Reason},
 		   ets_tid = Tid} = State) ->
     ets:match_delete(Tid, {{'_', Pid}, '_'}),
     Cur_1 = Cur - 1,
-    State_1 = case Cur_1 of
+    case Cur_1 of
 		  0 ->
 		      ets:delete(Tid),
-		      State#state{ets_tid = undefined};
+			  {noreply, State#state{ets_tid = undefined, num_cur_sessions = 0}, 10000};
 		  _ ->
-		      State
-	      end,
-    {noreply, State_1#state{num_cur_sessions = Cur_1}};
+		      {noreply, State#state{num_cur_sessions = Cur_1}}
+	      end;
 
 handle_info({trace, Bool}, #state{ets_tid = undefined} = State) ->
     put(my_trace_flag, Bool),
@@ -196,6 +195,18 @@ handle_info({trace, Bool}, #state{ets_tid = Tid} = State) ->
     put(my_trace_flag, Bool),
     {noreply, State};
 
+handle_info(timeout, State) ->
+    %% We can't shutdown the process immediately because a request
+    %% might be in flight. So we first remove the entry from the
+    %% ibrowse_lb ets table, and then shutdown a couple of seconds
+    %% later
+    ets:delete(ibrowse_lb, {State#state.host, State#state.port}),
+    erlang:send_after(2000, self(), shutdown),
+    {noreply, State#state{proc_state = shutting_down}};
+
+handle_info(shutdown, State) ->
+    {stop, normal, State};
+
 handle_info(_Info, State) ->
     {noreply, State}.
 
@@ -219,13 +230,19 @@ code_change(_OldVsn, State, _Extra) ->
 %%% Internal functions
 %%--------------------------------------------------------------------
 find_best_connection(Tid, Max_pipe) ->
-    case ets:first(Tid) of
-	{Cur_sz, Pid} when Cur_sz < Max_pipe ->
-	    ets:delete(Tid, {Cur_sz, Pid}),
-	    ets:insert(Tid, {{Cur_sz + 1, Pid}, []}),
-	    {ok, Pid};
-	_ ->
-	    {error, retry_later}
+    Res = find_best_connection(ets:first(Tid), Tid, Max_pipe),
+    Res.
+
+find_best_connection('$end_of_table', _, _) ->
+    {error, retry_later};
+find_best_connection(Pid, Tid, Max_pipe) ->
+    case ets:lookup(Tid, Pid) of
+        [{Pid, Cur_sz, Speculative_sz}] when Cur_sz < Max_pipe,
+                                             Speculative_sz < Max_pipe ->
+            ets:update_counter(Tid, Pid, {3, 1, 9999999, 9999999}),
+            {ok, Pid};
+        _ ->
+            find_best_connection(ets:next(Tid, Pid), Tid, Max_pipe)
     end.
 
 maybe_create_ets(#state{ets_tid = undefined} = State) ->

http://git-wip-us.apache.org/repos/asf/couchdb/blob/38e0fb2e/src/ibrowse/ibrowse_lib.erl
----------------------------------------------------------------------
diff --git a/src/ibrowse/ibrowse_lib.erl b/src/ibrowse/ibrowse_lib.erl
index 3cbe3ac..1ce6bd4 100644
--- a/src/ibrowse/ibrowse_lib.erl
+++ b/src/ibrowse/ibrowse_lib.erl
@@ -12,6 +12,10 @@
 
 -include("ibrowse.hrl").
 
+-ifdef(EUNIT).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
 -export([
          get_trace_status/2,
          do_trace/2,
@@ -180,18 +184,24 @@ get_value(Tag, TVL) ->
     V.
 
 parse_url(Url) ->
-    case parse_url(Url, get_protocol, #url{abspath=Url}, []) of
-        #url{host_type = undefined, host = Host} = UrlRec ->
-            case inet_parse:address(Host) of
-                {ok, {_, _, _, _, _, _, _, _}} ->
-                    UrlRec#url{host_type = ipv6_address};
-                {ok, {_, _, _, _}} ->
-                    UrlRec#url{host_type = ipv4_address};
-                _ ->
-                    UrlRec#url{host_type = hostname}
-            end;
-        Else ->
-            Else
+    try
+        case parse_url(Url, get_protocol, #url{abspath=Url}, []) of
+            #url{host_type = undefined, host = Host} = UrlRec ->
+                case inet_parse:address(Host) of
+                    {ok, {_, _, _, _, _, _, _, _}} ->
+                        UrlRec#url{host_type = ipv6_address};
+                    {ok, {_, _, _, _}} ->
+                        UrlRec#url{host_type = ipv4_address};
+                    _ ->
+                        UrlRec#url{host_type = hostname}
+                end;
+            #url{} = UrlRec ->
+                UrlRec;
+            _ ->
+                {error, invalid_uri}
+        end
+    catch _:_ ->
+            {error, invalid_uri}
     end.
 
 parse_url([$:, $/, $/ | _], get_protocol, Url, []) ->
@@ -389,3 +399,43 @@ do_trace(true, Fmt, Args) ->
 do_trace(_, _, _) ->
     ok.
 -endif.
+
+-ifdef(EUNIT).
+
+parse_url_test() ->
+    Urls = [{"http://[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:80/index.html",
+             #url{abspath = "http://[FEDC:BA98:7654:3210:FEDC:BA98:7654:3210]:80/index.html",
+                  host = "FEDC:BA98:7654:3210:FEDC:BA98:7654:3210",
+                  port = 80, protocol = http, path = "/index.html",
+                  host_type = ipv6_address}},
+            {"http://[1080:0:0:0:8:800:200C:417A]/index.html",
+             #url{abspath = "http://[1080:0:0:0:8:800:200C:417A]/index.html",
+                  host_type = ipv6_address, port = 80, protocol = http,
+                  host = "1080:0:0:0:8:800:200C:417A", path = "/index.html"}},
+            {"http://[3ffe:2a00:100:7031::1]",
+             #url{abspath = "http://[3ffe:2a00:100:7031::1]",
+                  host_type = ipv6_address, port = 80, protocol = http,
+                  host = "3ffe:2a00:100:7031::1", path = "/"}},
+            {"http://[1080::8:800:200C:417A]/foo",
+             #url{abspath = "http://[1080::8:800:200C:417A]/foo",
+                  host_type = ipv6_address, port = 80, protocol = http,
+                  host = "1080::8:800:200C:417A", path = "/foo"}},
+            {"http://[::192.9.5.5]/ipng",
+             #url{abspath = "http://[::192.9.5.5]/ipng",
+                  host_type = ipv6_address, port = 80, protocol = http,
+                  host = "::192.9.5.5", path = "/ipng"}},
+            {"http://[::FFFF:129.144.52.38]:80/index.html",
+             #url{abspath = "http://[::FFFF:129.144.52.38]:80/index.html",
+                  host_type = ipv6_address, port = 80, protocol = http,
+                  host = "::FFFF:129.144.52.38", path = "/index.html"}},
+            {"http://[2010:836B:4179::836B:4179]",
+             #url{abspath = "http://[2010:836B:4179::836B:4179]",
+                  host_type = ipv6_address, port = 80, protocol = http,
+                  host = "2010:836B:4179::836B:4179", path = "/"}}
+           ],
+    lists:foreach(
+      fun({Url, Expected_result}) ->
+              ?assertMatch(Expected_result, parse_url(Url))
+      end, Urls).
+
+-endif.

http://git-wip-us.apache.org/repos/asf/couchdb/blob/38e0fb2e/src/ibrowse/ibrowse_test.erl
----------------------------------------------------------------------
diff --git a/src/ibrowse/ibrowse_test.erl b/src/ibrowse/ibrowse_test.erl
index ff3b530..d97f76c 100644
--- a/src/ibrowse/ibrowse_test.erl
+++ b/src/ibrowse/ibrowse_test.erl
@@ -20,7 +20,14 @@
 	 test_stream_once/3,
 	 test_stream_once/4,
          test_20122010/0,
-         test_20122010/1
+         test_20122010/1,
+         test_pipeline_head_timeout/0,
+         test_pipeline_head_timeout/1,
+         do_test_pipeline_head_timeout/4,
+         test_head_transfer_encoding/0,
+         test_head_transfer_encoding/1,
+         test_head_response_with_body/0,
+         test_head_response_with_body/1
 	]).
 
 test_stream_once(Url, Method, Options) ->
@@ -81,7 +88,7 @@ send_reqs_1(Url, NumWorkers, NumReqsPerWorker) ->
     log_msg("Starting spawning of workers...~n", []),
     spawn_workers(Url, NumWorkers, NumReqsPerWorker),
     log_msg("Finished spawning workers...~n", []),
-    do_wait(),
+    do_wait(Url),
     End_time = now(),
     log_msg("All workers are done...~n", []),
     log_msg("ibrowse_test_results table: ~n~p~n", [ets:tab2list(ibrowse_test_results)]),
@@ -111,24 +118,28 @@ spawn_workers(Url, NumWorkers, NumReqsPerWorker) ->
     ets:insert(pid_table, {Pid, []}),
     spawn_workers(Url, NumWorkers - 1, NumReqsPerWorker).
 
-do_wait() ->
+do_wait(Url) ->
     receive
 	{'EXIT', _, normal} ->
-	    do_wait();
+            catch ibrowse:show_dest_status(Url),
+            catch ibrowse:show_dest_status(),
+	    do_wait(Url);
 	{'EXIT', Pid, Reason} ->
 	    ets:delete(pid_table, Pid),
 	    ets:insert(ibrowse_errors, {Pid, Reason}),
 	    ets:update_counter(ibrowse_test_results, crash, 1),
-	    do_wait();
+	    do_wait(Url);
 	Msg ->
 	    io:format("Recvd unknown message...~p~n", [Msg]),
-	    do_wait()
+	    do_wait(Url)
     after 1000 ->
 	    case ets:info(pid_table, size) of
 		0 ->
 		    done;
 		_ ->
-		    do_wait()
+                    catch ibrowse:show_dest_status(Url),
+                    catch ibrowse:show_dest_status(),
+		    do_wait(Url)
 	    end
     end.
 
@@ -219,7 +230,10 @@ dump_errors(Key, Iod) ->
 		    {"http://jigsaw.w3.org/HTTP/CL/", get},
 		    {"http://www.httpwatch.com/httpgallery/chunked/", get},
                     {"https://github.com", get, [{ssl_options, [{depth, 2}]}]},
-                    {local_test_fun, test_20122010, []}
+                    {local_test_fun, test_20122010, []},
+                    {local_test_fun, test_pipeline_head_timeout, []},
+                    {local_test_fun, test_head_transfer_encoding, []},
+                    {local_test_fun, test_head_response_with_body, []}
 		   ]).
 
 unit_tests() ->
@@ -232,16 +246,19 @@ unit_tests(Options) ->
     (catch ibrowse_test_server:start_server(8181, tcp)),
     ibrowse:start(),
     Options_1 = Options ++ [{connect_timeout, 5000}],
+    Test_timeout = proplists:get_value(test_timeout, Options, 60000),
     {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1]),
     receive 
 	{done, Pid} ->
 	    ok;
 	{'DOWN', Ref, _, _, Info} ->
 	    io:format("Test process crashed: ~p~n", [Info])
-    after 60000 ->
+    after Test_timeout ->
 	    exit(Pid, kill),
 	    io:format("Timed out waiting for tests to complete~n", [])
-    end.
+    end,
+    catch ibrowse_test_server:stop_server(8181),
+    ok.
 
 unit_tests_1(Parent, Options) ->
     lists:foreach(fun({local_test_fun, Fun_name, Args}) ->
@@ -426,6 +443,101 @@ log_msg(Fmt, Args) ->
 	      [ibrowse_lib:printable_date() | Args]).
 
 %%------------------------------------------------------------------------------
+%% Test what happens when the response to a HEAD request is a
+%% Chunked-Encoding response with a non-empty body. Issue #67 on
+%% Github
+%% ------------------------------------------------------------------------------
+test_head_transfer_encoding() ->
+    clear_msg_q(),
+    test_head_transfer_encoding("http://localhost:8181/ibrowse_head_test").
+
+test_head_transfer_encoding(Url) ->
+    case ibrowse:send_req(Url, [], head) of
+        {ok, "200", _, _} ->
+            success;
+        Res ->
+            {test_failed, Res}
+    end.
+
+%%------------------------------------------------------------------------------
+%% Test what happens when the response to a HEAD request is a
+%% Chunked-Encoding response with a non-empty body. Issue #67 on
+%% Github
+%% ------------------------------------------------------------------------------
+test_head_response_with_body() ->
+    clear_msg_q(),
+    test_head_response_with_body("http://localhost:8181/ibrowse_head_transfer_enc").
+
+test_head_response_with_body(Url) ->
+    case ibrowse:send_req(Url, [], head, [], [{workaround, head_response_with_body}]) of
+        {ok, "400", _, _} ->
+            success;
+        Res ->
+            {test_failed, Res}
+    end.
+
+%%------------------------------------------------------------------------------
+%% Test what happens when the request at the head of a pipeline times out
+%%------------------------------------------------------------------------------
+test_pipeline_head_timeout() ->
+    clear_msg_q(),
+    test_pipeline_head_timeout("http://localhost:8181/ibrowse_inac_timeout_test").
+
+test_pipeline_head_timeout(Url) ->
+    {ok, Pid} = ibrowse:spawn_worker_process(Url),
+    Test_parent = self(),
+    Fun = fun({fixed, Timeout}) ->
+                  spawn(fun() ->
+                                do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout)
+                        end);
+             (Timeout_mult) ->
+                  spawn(fun() ->
+                                Timeout = 1000 + Timeout_mult*1000,
+                                do_test_pipeline_head_timeout(Url, Pid, Test_parent, Timeout)
+                        end)
+          end,
+    Pids = [Fun(X) || X <- [{fixed, 32000} | lists:seq(1,10)]],
+    Result = accumulate_worker_resp(Pids),
+    case lists:all(fun({_, X_res}) ->
+                           X_res == {error,req_timedout}
+                   end, Result) of
+        true ->
+            success;
+        false ->
+            {test_failed, Result}
+    end.
+
+do_test_pipeline_head_timeout(Url, Pid, Test_parent, Req_timeout) ->
+    Resp = ibrowse:send_req_direct(
+                                 Pid,
+                                 Url,
+                                 [], get, [],
+                                 [{socket_options,[{keepalive,true}]},
+                                  {inactivity_timeout,180000},
+                                  {connect_timeout,180000}], Req_timeout),
+    Test_parent ! {self(), Resp}.
+
+accumulate_worker_resp(Pids) ->
+    accumulate_worker_resp(Pids, []).
+
+accumulate_worker_resp([_ | _] = Pids, Acc) ->
+    receive
+        {Pid, Res} when is_pid(Pid) ->
+            accumulate_worker_resp(Pids -- [Pid], [{Pid, Res} | Acc]);
+        Err ->
+            io:format("Received unexpected: ~p~n", [Err])
+    end;
+accumulate_worker_resp([], Acc) ->
+    lists:reverse(Acc).
+
+clear_msg_q() ->
+    receive
+        _ ->
+            clear_msg_q()
+    after 0 ->
+            ok
+    end.
+%%------------------------------------------------------------------------------
 %% 
 %%------------------------------------------------------------------------------