You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@couchdb.apache.org by rn...@apache.org on 2016/08/22 11:19:48 UTC

[22/50] ibrowse commit: updated refs/heads/upstream to b28542d

Merge branch 'improve_pipeline_balance' of https://github.com/benjaminplee/ibrowse into merge_pull_req_123


Project: http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/repo
Commit: http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/commit/8494e943
Tree: http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/tree/8494e943
Diff: http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/diff/8494e943

Branch: refs/heads/upstream
Commit: 8494e9433f1e74f9ab43aaa23039ff3ccedadb55
Parents: d61dd9a 37fce82
Author: Chandrashekhar Mullaparthi <ch...@gmail.com>
Authored: Mon Sep 28 08:19:59 2015 +0100
Committer: Chandrashekhar Mullaparthi <ch...@gmail.com>
Committed: Mon Sep 28 08:19:59 2015 +0100

----------------------------------------------------------------------
 .gitignore                        |   1 +
 CONTRIBUTORS                      |   3 +
 Makefile                          |   6 +-
 include/ibrowse.hrl               |   5 +
 rebar                             | Bin 90778 -> 188026 bytes
 src/ibrowse.erl                   |  11 ++-
 src/ibrowse_http_client.erl       |   2 +-
 src/ibrowse_lb.erl                |  27 +++--
 src/ibrowse_lib.erl               |   1 +
 src/ibrowse_socks5.erl            | 131 ++++++++++++++++++-------
 test/ibrowse_functional_tests.erl | 174 +++++++++++++++++++++++++++++++++
 test/ibrowse_test.erl             |  20 ++--
 test/ibrowse_test_server.erl      |  97 +++++++++++-------
 13 files changed, 376 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/blob/8494e943/Makefile
----------------------------------------------------------------------
diff --cc Makefile
index b596b64,b596b64..28dfda8
--- a/Makefile
+++ b/Makefile
@@@ -15,9 -15,9 +15,11 @@@ install: compil
  	mkdir -p $(DESTDIR)/lib/ibrowse-$(IBROWSE_VSN)/
  	cp -r ebin $(DESTDIR)/lib/ibrowse-$(IBROWSE_VSN)/
  
--test: all
++eunit_test: all
  	./rebar eunit
--	erl -noshell -pa .eunit -pa test -s ibrowse -s ibrowse_test unit_tests \
++
++test: all
++	erl -noshell -pa test -pa ebin -s ibrowse_test unit_tests \
  	-s ibrowse_test verify_chunked_streaming \
  	-s ibrowse_test test_chunked_streaming_once \
  	-s erlang halt

http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/blob/8494e943/include/ibrowse.hrl
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/blob/8494e943/src/ibrowse.erl
----------------------------------------------------------------------
diff --cc src/ibrowse.erl
index fbb4b83,951cfe1..51fcb86
--- a/src/ibrowse.erl
+++ b/src/ibrowse.erl
@@@ -651,7 -619,7 +651,7 @@@ show_dest_status() -
      io:format("~80.80.=s~n", [""]),
      Metrics = get_metrics(),
      lists:foreach(
--      fun({Host, Port, Lb_pid, Tid, Size}) ->
++      fun({Host, Port, {Lb_pid, _, Tid, Size, _}}) ->
                io:format("~40.40s | ~-5.5s | ~-5.5s | ~p~n",
                          [Host ++ ":" ++ integer_to_list(Port),
                           integer_to_list(Tid),
@@@ -686,33 -654,43 +686,38 @@@ show_dest_status(Host, Port) -
      end.
  
  get_metrics() ->
 -    Dests = lists:filter(fun({lb_pid, {Host, Port}, _}) when is_list(Host),
 -                                                             is_integer(Port) ->
 -                                 true;
 -                            (_) ->
 -                                 false
 -                         end, ets:tab2list(?LOAD_BALANCER_NAMED_TABLE)),
 -    All_ets = ets:all(),
 -    lists:map(fun({lb_pid, {Host, Port}, Lb_pid}) ->
 -                  case lists:dropwhile(
 -                         fun(Tid) ->
 -                                 ets:info(Tid, owner) /= Lb_pid
 -                         end, All_ets) of
 -                      [] ->
 -                          {Host, Port, Lb_pid, unknown, 0};
 -                      [Tid | _] ->
 -                          Size = case catch (ets:info(Tid, size)) of
 -                                     N when is_integer(N) -> N;
 -                                     _ -> 0
 -                                 end,
 -                          {Host, Port, Lb_pid, Tid, Size}
 -                  end
 -              end, Dests).
 +    Dests = lists:filter(
 +              fun(#lb_pid{host_port = {Host, Port}}) when is_list(Host),
 +                                                          is_integer(Port) ->
 +                      true;
 +                 (_) ->
 +                      false
 +              end, ets:tab2list(ibrowse_lb)),
 +    lists:foldl(
 +      fun(#lb_pid{host_port = {X_host, X_port}}, X_acc) ->
 +              case get_metrics(X_host, X_port) of
 +                  {_, _, _, _, _} = X_res ->
-                       [X_res | X_acc];
++                      [{X_host, X_port, X_res} | X_acc];
 +                  _X_res ->
 +                      X_acc
 +              end
 +      end, [], Dests).
  
  get_metrics(Host, Port) ->
 -    case ets:lookup(?LOAD_BALANCER_NAMED_TABLE, {Host, Port}) of
 +    case ets:lookup(ibrowse_lb, {Host, Port}) of
          [] ->
              no_active_processes;
 -        [#lb_pid{pid = Lb_pid}] ->
 -            MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)),
 -            %% {Lb_pid, MsgQueueSize,
 -            case lists:dropwhile(
 -                   fun(Tid) ->
 -                           ets:info(Tid, owner) /= Lb_pid
 -                   end, ets:all()) of
 -                [] ->
 -                    {Lb_pid, MsgQueueSize, unknown, 0, unknown};
 -                [Tid | _] ->
 +        [#lb_pid{pid = Lb_pid, ets_tid = Tid}] ->
-             MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)),
++            MsgQueueSize = case (catch process_info(Lb_pid, message_queue_len)) of
++			       {message_queue_len, Msg_q_len} ->
++				   Msg_q_len;
++			       _ ->
++				   -1
++			   end,
 +            case Tid of
 +                undefined ->
 +                    {Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}};
 +                _ ->
                      try
                          Size = ets:info(Tid, size),
                          case Size of

http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/blob/8494e943/src/ibrowse_http_client.erl
----------------------------------------------------------------------
diff --cc src/ibrowse_http_client.erl
index db9559a,d92db42..92e4964
--- a/src/ibrowse_http_client.erl
+++ b/src/ibrowse_http_client.erl
@@@ -2005,30 -1944,15 +2005,30 @@@ to_lower([], Acc) -
  
  shutting_down(#state{lb_ets_tid = undefined}) ->
      ok;
 -shutting_down(#state{lb_ets_tid = Tid}) ->
 -    ibrowse_lb:report_connection_down(Tid).
 +shutting_down(#state{lb_ets_tid = Tid,
 +                     cur_pipeline_size = _Sz}) ->
 +    (catch ets:select_delete(Tid, [{{{'_', '_', '$1'},'_'},[{'==','$1',{const,self()}}],[true]}])).
  
 -report_request_complete(#state{is_closing = true} = State) ->
 +inc_pipeline_counter(#state{is_closing = true} = State) ->
      State;
 -report_request_complete(#state{lb_ets_tid = undefined} = State) ->
 +inc_pipeline_counter(#state{lb_ets_tid = undefined} = State) ->
      State;
 -report_request_complete(#state{lb_ets_tid = Tid} = State) ->
 -    ibrowse_lb:report_request_complete(Tid),
 +inc_pipeline_counter(#state{cur_pipeline_size = Pipe_sz} = State) ->
 +    State#state{cur_pipeline_size = Pipe_sz + 1}.
 +
 +dec_pipeline_counter(#state{cur_pipeline_size = Pipe_sz,
 +                            lb_ets_tid        = Tid,
 +                            proc_state        = Proc_state} = State) when Tid /= undefined,
 +                                                                          Proc_state /= ?dead_proc_walking ->
 +    Ts = os:timestamp(),
++    catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}),
 +    (catch ets:select_delete(Tid, [{{{'_', '$2', '$1'},'_'},
 +                                    [{'==', '$1', {const,self()}},
 +                                     {'<',  '$2', {const,Ts}}
 +                                    ],
 +                                    [true]}])),
-     catch ets:insert(Tid, {{Pipe_sz - 1, os:timestamp(), self()}, []}),
 +    State#state{cur_pipeline_size = Pipe_sz - 1};
 +dec_pipeline_counter(State) ->
      State.
  
  flatten([H | _] = L) when is_integer(H) ->

http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/blob/8494e943/src/ibrowse_lb.erl
----------------------------------------------------------------------
diff --cc src/ibrowse_lb.erl
index 88b169b,794ba45..894d8ad
--- a/src/ibrowse_lb.erl
+++ b/src/ibrowse_lb.erl
@@@ -119,23 -133,18 +119,23 @@@ handle_call(stop, _From, #state{ets_ti
  handle_call(_, _From, #state{proc_state = shutting_down} = State) ->
      {reply, {error, shutting_down}, State};
  
 -handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options, Process_options}, _From, State) ->
 -    State_1 = maybe_create_ets(State),
 -    Tid = State_1#state.ets_tid,
 -    Reply = case num_current_connections(Tid) of
 -        X when X >= Max_sess ->
 -            find_best_connection(Tid, Max_pipe);
 -        _ ->
 -            Result = {ok, Pid} = ibrowse_http_client:start_link({Tid, Url, SSL_options}, Process_options),
 -            record_new_connection(Tid, Pid),
 -            Result
 -    end,
 -    {reply, Reply, State_1#state{max_sessions = Max_sess, max_pipeline_size = Max_pipe}};
 +handle_call({spawn_connection, Url, Max_sess, Max_pipe, SSL_options, Process_options}, _From,
 +	    State) ->
 +    State_1   = maybe_create_ets(State),
 +    Tid       = State_1#state.ets_tid,
 +    Tid_size  = ets:info(Tid, size),
-     case Tid_size > Max_sess of
++    case Tid_size >= Max_sess of
 +        true ->
-             Reply = find_best_connection(Tid, Max_pipe, Tid_size),
++            Reply = find_best_connection(Tid, Max_pipe),
 +            {reply, Reply, State_1#state{max_sessions      = Max_sess,
 +                                         max_pipeline_size = Max_pipe}};
 +        false ->
 +            {ok, Pid} = ibrowse_http_client:start({Tid, Url, SSL_options}, Process_options),
 +            Ts = os:timestamp(),
-             ets:insert(Tid, {{0, Ts, Pid}, []}),
-             {reply, {ok, {0, Ts, Pid}}, State_1#state{max_sessions      = Max_sess,
-                                                   max_pipeline_size = Max_pipe}}
++            ets:insert(Tid, {{1, Ts, Pid}, []}),
++            {reply, {ok, {1, Ts, Pid}}, State_1#state{max_sessions      = Max_sess,
++						      max_pipeline_size = Max_pipe}}
 +    end;
  
  handle_call(Request, _From, State) ->
      Reply = {unknown_request, Request},
@@@ -215,18 -214,21 +215,13 @@@ code_change(_OldVsn, State, _Extra) -
  %%--------------------------------------------------------------------
  %%% Internal functions
  %%--------------------------------------------------------------------
- find_best_connection(Tid, Max_pipe, _Num_cur) ->
 -find_best_connection(Tid, Max_pipeline_size) ->
 -    find_best_connection(Tid, Max_pipeline_size, ?MAX_RETRIES).
 -
 -find_best_connection(_Tid, _Max_pipeline_size, 0) ->
 -    {error, retry_later};
 -find_best_connection(Tid, Max_pipeline_size, RemainingRetries) ->
++find_best_connection(Tid, Max_pipe) ->
      case ets:first(Tid) of
-         {Spec_size, Ts, Pid} = First ->
-             case Spec_size >= Max_pipe of
 -        {Size, _Timestamp, Pid} = Key when Size < Max_pipeline_size ->
 -            case record_request_for_connection(Tid, Key) of
--                true ->
-                     {error, retry_later};
 -                    {ok, Pid};
--                false ->
-                     ets:delete(Tid, First),
-                     ets:insert(Tid, {{Spec_size + 1, Ts, Pid}, []}),
-                     {ok, First}
 -                    find_best_connection(Tid, Max_pipeline_size, RemainingRetries - 1)
--            end;
-         '$end_of_table' ->
 -        _ -> 
++        {Spec_size, Ts, Pid} = First when Spec_size < Max_pipe ->
++	    ets:delete(Tid, First),
++	    ets:insert(Tid, {{Spec_size + 1, Ts, Pid}, []}),
++	    {ok, First};
++        _ ->
              {error, retry_later}
      end.
  

http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/blob/8494e943/src/ibrowse_lib.erl
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/blob/8494e943/test/ibrowse_functional_tests.erl
----------------------------------------------------------------------
diff --cc test/ibrowse_functional_tests.erl
index 0000000,e55c5b2..3517011
mode 000000,100644..100644
--- a/test/ibrowse_functional_tests.erl
+++ b/test/ibrowse_functional_tests.erl
@@@ -1,0 -1,171 +1,174 @@@
+ %%% File    : ibrowse_functional_tests.erl
+ %%% Authors : Benjamin Lee <http://github.com/benjaminplee>
+ %%%           Dan Schwabe <http://github.com/dfschwabe>
+ %%%           Brian Richards <http://github.com/richbria>
+ %%% Description : Functional tests of the ibrowse library using a live test HTTP server
+ %%% Created : 18 November 2014 by Benjamin Lee <ya...@gmail.com>
+ 
+ -module(ibrowse_functional_tests).
+ 
+ -include_lib("eunit/include/eunit.hrl").
+ -define(PER_TEST_TIMEOUT_SEC, 60).
+ -define(TIMEDTEST(Desc, Fun), {Desc, {timeout, ?PER_TEST_TIMEOUT_SEC, fun Fun/0}}).
+ 
+ -define(SERVER_PORT, 8181).
+ -define(BASE_URL, "http://localhost:" ++ integer_to_list(?SERVER_PORT)).
+ -define(SHORT_TIMEOUT_MS, 5000).
+ -define(LONG_TIMEOUT_MS, 30000).
+ -define(PAUSE_FOR_CONNECTIONS_MS, 2000).
+ 
++-compile(export_all).
++
+ setup() ->
+     application:start(crypto),
+     application:start(public_key),
+     application:start(ssl),
+     ibrowse_test_server:start_server(?SERVER_PORT, tcp),
+     ibrowse:start(),
+     ok.
+ 
+ teardown(_) ->
+     ibrowse:stop(),
+     ibrowse_test_server:stop_server(?SERVER_PORT),
+     ok.
+ 
+ running_server_fixture_test_() ->
+     {foreach,
+      fun setup/0,
+      fun teardown/1,
+      [
+         ?TIMEDTEST("Simple request can be honored", simple_request),
+         ?TIMEDTEST("Slow server causes timeout", slow_server_timeout),
+         ?TIMEDTEST("Pipeline depth goes down with responses", pipeline_depth),
+         ?TIMEDTEST("Pipelines refill", pipeline_refill),
+         ?TIMEDTEST("Timeout closes pipe", closing_pipes),
+         ?TIMEDTEST("Requests are balanced over connections", balanced_connections),
+         ?TIMEDTEST("Pipeline too small signals retries", small_pipeline),
+         ?TIMEDTEST("Dest status can be gathered", status)
+      ]
+     }.
+ 
+ simple_request() ->
+     ?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [])).
+ 
+ slow_server_timeout() ->
+     ?assertMatch({error, req_timedout}, ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [], 5000)).
+ 
+ pipeline_depth() ->
+     MaxSessions = 2,
+     MaxPipeline = 2,
+     RequestsSent = 2,
+     EmptyPipelineDepth = 0,
+ 
+     ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
+ 
+     Fun = fun() -> ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
+     times(RequestsSent, fun() -> spawn_link(Fun) end),
+ 
+     timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
+ 
+     Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
+     ?assertEqual(MaxSessions, length(Counts)),
+     ?assertEqual(lists:duplicate(MaxSessions, EmptyPipelineDepth), Counts).
+ 
+ pipeline_refill() ->
+     MaxSessions = 2,
+     MaxPipeline = 2,
+     RequestsToFill = MaxSessions * MaxPipeline,
+ 
+     %% Send off enough requests to fill sessions and pipelines in rappid succession
+     Fun = fun() -> ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
+     times(RequestsToFill, fun() -> spawn_link(Fun) end),
+     timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
+ 
+     % Verify that connections properly reported their completed responses and can still accept more
+     ?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS)),
+ 
+     % and do it again to make sure we really are clear
+     times(RequestsToFill, fun() -> spawn_link(Fun) end),
+     timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
+ 
+     % Verify that connections properly reported their completed responses and can still accept more
+     ?assertMatch({ok, "200", _, _}, ibrowse:send_req(?BASE_URL, [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS)).
+ 
+ closing_pipes() ->
+     MaxSessions = 2,
+     MaxPipeline = 2,
+     RequestsSent = 2,
+     BalancedNumberOfRequestsPerConnection = 1,
+ 
+     ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
+ 
+     Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
+     times(RequestsSent, fun() -> spawn_link(Fun) end),
+ 
+     timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
+ 
+     Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
+     ?assertEqual(MaxSessions, length(Counts)),
+     ?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts),
+ 
+     timer:sleep(?SHORT_TIMEOUT_MS),
+ 
+     ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()).
+ 
+ balanced_connections() ->
+     MaxSessions = 4,
+     MaxPipeline = 100,
+     RequestsSent = 80,
+     BalancedNumberOfRequestsPerConnection = 20,
+ 
+     ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
+ 
+     Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?LONG_TIMEOUT_MS) end,
+     times(RequestsSent, fun() -> spawn_link(Fun) end),
+ 
+     timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),
+ 
+     Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
+     ?assertEqual(MaxSessions, length(Counts)),
+ 
+     ?assertEqual(lists:duplicate(MaxSessions, BalancedNumberOfRequestsPerConnection), Counts).
+ 
+ small_pipeline() ->
+     MaxSessions = 10,
+     MaxPipeline = 10,
+     RequestsSent = 100,
+     FullRequestsPerConnection = 10,
+ 
+     ?assertEqual([], ibrowse_test_server:get_conn_pipeline_depth()),
+ 
+     Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
+     times(RequestsSent, fun() -> spawn(Fun) end),
+ 
+     timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),  %% Wait for everyone to get in line
+ 
++    ibrowse:show_dest_status("localhost", 8181),
+     Counts = [Count || {_Pid, Count} <- ibrowse_test_server:get_conn_pipeline_depth()],
+     ?assertEqual(MaxSessions, length(Counts)),
+ 
+     ?assertEqual(lists:duplicate(MaxSessions, FullRequestsPerConnection), Counts),
+ 
+     Response = ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS),
+ 
+     ?assertEqual({error, retry_later}, Response).
+ 
+ status() ->
+     MaxSessions = 10,
+     MaxPipeline = 10,
+     RequestsSent = 100,
+ 
+     Fun = fun() -> ibrowse:send_req(?BASE_URL ++ "/never_respond", [], get, [], [{max_sessions, MaxSessions}, {max_pipeline_size, MaxPipeline}], ?SHORT_TIMEOUT_MS) end,
+     times(RequestsSent, fun() -> spawn(Fun) end),
+ 
+     timer:sleep(?PAUSE_FOR_CONNECTIONS_MS),  %% Wait for everyone to get in line
+ 
+     ibrowse:show_dest_status(),
+     ibrowse:show_dest_status("http://localhost:8181").
+ 
+ 
+ times(0, _) ->
+     ok;
+ times(X, Fun) ->
+     Fun(),
+     times(X - 1, Fun).

http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/blob/8494e943/test/ibrowse_test.erl
----------------------------------------------------------------------
diff --cc test/ibrowse_test.erl
index e216e82,4ddb9c1..0787493
--- a/test/ibrowse_test.erl
+++ b/test/ibrowse_test.erl
@@@ -35,13 -34,9 +35,13 @@@
           test_303_response_with_a_body/1,
           test_binary_headers/0,
           test_binary_headers/1,
 -         test_generate_body_0/0
 +         test_generate_body_0/0,
 +         test_retry_of_requests/0,
 +         test_retry_of_requests/1
  	]).
  
- -include("ibrowse.hrl").
++-include_lib("ibrowse/include/ibrowse.hrl").
 +
  test_stream_once(Url, Method, Options) ->
      test_stream_once(Url, Method, Options, 5000).
  
@@@ -214,65 -207,56 +214,65 @@@ dump_errors(Key, Iod) -
  %%------------------------------------------------------------------------------
  %% Unit Tests
  %%------------------------------------------------------------------------------
 +-define(LOCAL_TESTS, [
 +                      {local_test_fun, test_20122010, []},
 +                      {local_test_fun, test_pipeline_head_timeout, []},
 +                      {local_test_fun, test_head_transfer_encoding, []},
 +                      {local_test_fun, test_head_response_with_body, []},
 +                      {local_test_fun, test_303_response_with_a_body, []},
 +                      {local_test_fun, test_binary_headers, []},
 +                      {local_test_fun, test_retry_of_requests, []}
 +                     ]).
 +
  -define(TEST_LIST, [{"http://intranet/messenger", get},
 -            {"http://www.google.co.uk", get},
 -            {"http://www.google.com", get},
 -            {"http://www.google.com", options},
 -            {"https://mail.google.com", get},
 -            {"http://www.sun.com", get},
 -            {"http://www.oracle.com", get},
 -            {"http://www.bbc.co.uk", get},
 -            {"http://www.bbc.co.uk", trace},
 -            {"http://www.bbc.co.uk", options},
 -            {"http://yaws.hyber.org", get},
 -            {"http://jigsaw.w3.org/HTTP/ChunkedScript", get},
 -            {"http://jigsaw.w3.org/HTTP/TE/foo.txt", get},
 -            {"http://jigsaw.w3.org/HTTP/TE/bar.txt", get},
 -            {"http://jigsaw.w3.org/HTTP/connection.html", get},
 -            {"http://jigsaw.w3.org/HTTP/cc.html", get},
 -            {"http://jigsaw.w3.org/HTTP/cc-private.html", get},
 -            {"http://jigsaw.w3.org/HTTP/cc-proxy-revalidate.html", get},
 -            {"http://jigsaw.w3.org/HTTP/cc-nocache.html", get},
 -            {"http://jigsaw.w3.org/HTTP/h-content-md5.html", get},
 -            {"http://jigsaw.w3.org/HTTP/h-retry-after.html", get},
 -            {"http://jigsaw.w3.org/HTTP/h-retry-after-date.html", get},
 -            {"http://jigsaw.w3.org/HTTP/neg", get},
 -            {"http://jigsaw.w3.org/HTTP/negbad", get},
 -            {"http://jigsaw.w3.org/HTTP/400/toolong/", get},
 -            {"http://jigsaw.w3.org/HTTP/300/", get},
 -            {"http://jigsaw.w3.org/HTTP/Basic/", get, [{basic_auth, {"guest", "guest"}}]},
 -            {"http://jigsaw.w3.org/HTTP/CL/", get},
 -            {"http://www.httpwatch.com/httpgallery/chunked/", get},
 -            {"https://github.com", get, [{ssl_options, [{depth, 2}]}]},
 -            {local_test_fun, test_20122010, []},
 -            {local_test_fun, test_pipeline_head_timeout, []},
 -            {local_test_fun, test_head_transfer_encoding, []},
 -            {local_test_fun, test_head_response_with_body, []},
 -            {local_test_fun, test_303_response_with_a_body, []},
 -            {local_test_fun, test_binary_headers, []}
 -           ]).
 +		    {"http://www.google.co.uk", get},
 +		    {"http://www.google.com", get},
 +		    {"http://www.google.com", options},
 +                    {"https://mail.google.com", get},
 +		    {"http://www.sun.com", get},
 +		    {"http://www.oracle.com", get},
 +		    {"http://www.bbc.co.uk", get},
 +		    {"http://www.bbc.co.uk", trace},
 +		    {"http://www.bbc.co.uk", options},
 +		    {"http://yaws.hyber.org", get},
 +		    {"http://jigsaw.w3.org/HTTP/ChunkedScript", get},
 +		    {"http://jigsaw.w3.org/HTTP/TE/foo.txt", get},
 +		    {"http://jigsaw.w3.org/HTTP/TE/bar.txt", get},
 +		    {"http://jigsaw.w3.org/HTTP/connection.html", get},
 +		    {"http://jigsaw.w3.org/HTTP/cc.html", get},
 +		    {"http://jigsaw.w3.org/HTTP/cc-private.html", get},
 +		    {"http://jigsaw.w3.org/HTTP/cc-proxy-revalidate.html", get},
 +		    {"http://jigsaw.w3.org/HTTP/cc-nocache.html", get},
 +		    {"http://jigsaw.w3.org/HTTP/h-content-md5.html", get},
 +		    {"http://jigsaw.w3.org/HTTP/h-retry-after.html", get},
 +		    {"http://jigsaw.w3.org/HTTP/h-retry-after-date.html", get},
 +		    {"http://jigsaw.w3.org/HTTP/neg", get},
 +		    {"http://jigsaw.w3.org/HTTP/negbad", get},
 +		    {"http://jigsaw.w3.org/HTTP/400/toolong/", get},
 +		    {"http://jigsaw.w3.org/HTTP/300/", get},
 +		    {"http://jigsaw.w3.org/HTTP/Basic/", get, [{basic_auth, {"guest", "guest"}}]},
 +		    {"http://jigsaw.w3.org/HTTP/CL/", get},
 +		    {"http://www.httpwatch.com/httpgallery/chunked/", get},
 +                    {"https://github.com", get, [{ssl_options, [{depth, 2}]}]}
 +		   ] ++ ?LOCAL_TESTS).
 +
 +local_unit_tests() ->
-     error_logger:tty(false),
-     unit_tests([], ?LOCAL_TESTS),
-     error_logger:tty(true).
++    unit_tests([], ?LOCAL_TESTS).
  
  unit_tests() ->
-     unit_tests([], ?TEST_LIST).
 -    unit_tests([]).
++    error_logger:tty(false),
++    unit_tests([], ?TEST_LIST),
++    error_logger:tty(true).
  
 -unit_tests(Options) ->
 +unit_tests(Options, Test_list) ->
      application:start(crypto),
      application:start(public_key),
--    application:start(ssl),
++    application:ensure_all_started(ssl),
      (catch ibrowse_test_server:start_server(8181, tcp)),
 -    ibrowse:start(),
 +    application:start(ibrowse),
      Options_1 = Options ++ [{connect_timeout, 5000}],
      Test_timeout = proplists:get_value(test_timeout, Options, 60000),
 -    {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1]),
 +    {Pid, Ref} = erlang:spawn_monitor(?MODULE, unit_tests_1, [self(), Options_1, Test_list]),
      receive 
  	{done, Pid} ->
  	    ok;
@@@ -387,6 -371,6 +387,8 @@@ wait_for_resp(Pid) -
  	    {'EXIT', Reason};
  	{'DOWN', _, _, _, _} ->
  	    wait_for_resp(Pid);
++	{'EXIT', _, normal} ->
++	    wait_for_resp(Pid);
  	Msg ->
  	    io:format("Recvd unknown message: ~p~n", [Msg]),
  	    wait_for_resp(Pid)
@@@ -556,74 -539,6 +558,74 @@@ test_303_response_with_a_body(Url) -
      end.
  
  %%------------------------------------------------------------------------------
 +%% Test that retry of requests happens correctly, and that ibrowse doesn't retry
 +%% if there is not enough time left
 +%%------------------------------------------------------------------------------
 +test_retry_of_requests() ->
 +    clear_msg_q(),
 +    test_retry_of_requests("http://localhost:8181/ibrowse_handle_one_request_only_with_delay").
 +
 +test_retry_of_requests(Url) ->
++    reset_ibrowse(),
 +    Timeout_1 = 2050,
 +    Res_1 = test_retry_of_requests(Url, Timeout_1),
 +    case lists:filter(fun({_Pid, {ok, "200", _, _}}) ->
 +                              true;
 +                         (_) -> false
 +                      end, Res_1) of
 +        [_|_] = X ->
 +            Res_1_1 = Res_1 -- X,
 +            case lists:all(
 +                   fun({_Pid, {error, retry_later}}) ->
 +                           true;
 +                      (_) ->
 +                           false
 +                   end, Res_1_1) of
 +                true ->
 +                    ok;
 +                false ->
 +                    exit({failed, Timeout_1, Res_1})
 +            end;
 +        _ ->
 +            exit({failed, Timeout_1, Res_1})
 +    end,
-     reset_ibrowse(),
 +    Timeout_2 = 2200,
 +    Res_2 = test_retry_of_requests(Url, Timeout_2),
 +    case lists:filter(fun({_Pid, {ok, "200", _, _}}) ->
 +                              true;
 +                         (_) -> false
 +                      end, Res_2) of
 +        [_|_] = Res_2_X ->
 +            Res_2_1 = Res_2 -- Res_2_X,
 +            case lists:all(
 +                   fun({_Pid, {error, X_err_2}}) ->
 +                           (X_err_2 == retry_later) orelse (X_err_2 == req_timedout);
 +                      (_) ->
 +                           false
 +                   end, Res_2_1) of
 +                true ->
 +                    ok;
 +                false ->
-                     exit({failed, Timeout_2, Res_2})
++                    exit({failed, {?MODULE, ?LINE}, Timeout_2, Res_2})
 +            end;
 +        _ ->
-             exit({failed, Timeout_2, Res_2})
++            exit({failed, {?MODULE, ?LINE}, Timeout_2, Res_2})
 +    end,
 +    success.
 +
 +test_retry_of_requests(Url, Timeout) ->
 +    #url{host = Host, port = Port} = ibrowse_lib:parse_url(Url),
 +    ibrowse:set_max_sessions(Host, Port, 1),
 +    Parent = self(),
 +    Pids = lists:map(fun(_) ->
 +                        spawn(fun() ->
 +                                 Res = (catch ibrowse:send_req(Url, [], get, [], [], Timeout)),
 +                                 Parent ! {self(), Res}
 +                              end)
 +                     end, lists:seq(1,10)),
 +    accumulate_worker_resp(Pids).
 +
 +%%------------------------------------------------------------------------------
  %% Test what happens when the request at the head of a pipeline times out
  %%------------------------------------------------------------------------------
  test_pipeline_head_timeout() ->

http://git-wip-us.apache.org/repos/asf/couchdb-ibrowse/blob/8494e943/test/ibrowse_test_server.erl
----------------------------------------------------------------------
diff --cc test/ibrowse_test_server.erl
index 1d72210,dc0d7e2..7025286
--- a/test/ibrowse_test_server.erl
+++ b/test/ibrowse_test_server.erl
@@@ -15,29 -21,26 +21,30 @@@
  
  start_server(Port, Sock_type) ->
      Fun = fun() ->
-         Proc_name = server_proc_name(Port),
-         case whereis(Proc_name) of
-             undefined ->
-                 register(Proc_name, self()),
-                 case do_listen(Sock_type, Port, [{active, false},
-                     {reuseaddr, true},
-                     {nodelay, true},
-                     {packet, http}]) of
-                     {ok, Sock} ->
-                         do_trace("Server listening on port: ~p~n", [Port]),
-                         accept_loop(Sock, Sock_type);
-                     Err ->
-                         erlang:error(
 -                  Name = server_proc_name(Port),
 -                  register(Name, self()),
 -                  ets:new(?CONN_PIPELINE_DEPTH, [named_table, public, set]),
 -                  case do_listen(Sock_type, Port, [{active, false},
 -                                                   {reuseaddr, true},
 -                                                   {nodelay, true},
 -                                                   {packet, http}]) of
 -                      {ok, Sock} ->
 -                          do_trace("Server listening on port: ~p~n", [Port]),
 -                          accept_loop(Sock, Sock_type);
 -                      Err ->
 -                          erlang:error(
--                            lists:flatten(
-                                 io_lib:format(
-                                     "Failed to start server on port ~p. ~p~n",
-                                     [Port, Err]))),
-                         exit({listen_error, Err})
-                 end;
-             _X ->
-                 ok
-         end
-     end,
 -                              io_lib:format(
 -                                "Failed to start server on port ~p. ~p~n",
 -                                [Port, Err]))),
 -                          exit({listen_error, Err})
 -                  end,
 -                  unregister(Name)
 -          end,
++		  Proc_name = server_proc_name(Port),
++		  case whereis(Proc_name) of
++		      undefined ->
++			  register(Proc_name, self()),
++			  ets:new(?CONN_PIPELINE_DEPTH, [named_table, public, set]),
++			  case do_listen(Sock_type, Port, [{active, false},
++							   {reuseaddr, true},
++							   {nodelay, true},
++							   {packet, http}]) of
++			      {ok, Sock} ->
++				  do_trace("Server listening on port: ~p~n", [Port]),
++				  accept_loop(Sock, Sock_type);
++			      Err ->
++				  erlang:error(
++				    lists:flatten(
++				      io_lib:format(
++					"Failed to start server on port ~p. ~p~n",
++					[Port, Err]))),
++				  exit({listen_error, Err})
++			  end;
++		      _X ->
++			  ok
++		  end
++	  end,
      spawn_link(Fun).
  
  stop_server(Port) ->
@@@ -86,6 -105,7 +109,7 @@@ setopts(Sock, ssl, Opts) -
  server_loop(Sock, Sock_type, #request{headers = Headers} = Req) ->
      receive
          {http, Sock, {http_request, HttpMethod, HttpUri, HttpVersion}} ->
 -            ets:update_counter(?CONN_PIPELINE_DEPTH, self(), 1),
++            catch ets:update_counter(?CONN_PIPELINE_DEPTH, self(), 1),
              server_loop(Sock, Sock_type, Req#request{method = HttpMethod,
                                                       uri = HttpUri,
                                                       version = HttpVersion});
@@@ -93,15 -113,16 +117,18 @@@
              server_loop(Sock, Sock_type, Req#request{headers = [H | Headers]});
          {http, Sock, http_eoh} ->
              case process_request(Sock, Sock_type, Req) of
 +                close_connection ->
 +                    gen_tcp:shutdown(Sock, read_write);
+                 not_done ->
+                     ok;
                  _ ->
-                     server_loop(Sock, Sock_type, #request{})
-             end;
 -                    ets:update_counter(?CONN_PIPELINE_DEPTH, self(), -1)
++                    catch ets:update_counter(?CONN_PIPELINE_DEPTH, self(), -1)
+             end,
+             server_loop(Sock, Sock_type, #request{});
          {http, Sock, {http_error, Err}} ->
 -            do_trace("Error parsing HTTP request:~n"
 -                     "Req so far : ~p~n"
 -                     "Err        : ", [Req, Err]),
 +            io:format("Error parsing HTTP request:~n"
 +                      "Req so far : ~p~n"
 +                      "Err        : ~p", [Req, Err]),
              exit({http_error, Err});
          {setopts, Opts} ->
              setopts(Sock, Sock_type, Opts),
@@@ -109,12 -130,10 +136,10 @@@
          {tcp_closed, Sock} ->
              do_trace("Client closed connection~n", []),
              ok;
-         stop ->
-             ok;
          Other ->
 -            do_trace("Recvd unknown msg: ~p~n", [Other]),
 +            io:format("Recvd unknown msg: ~p~n", [Other]),
              exit({unknown_msg, Other})
 -    after 5000 ->
 +    after 120000 ->
              do_trace("Timing out client connection~n", []),
              ok
      end.
@@@ -200,21 -218,8 +224,23 @@@ process_request(Sock, Sock_type
                           uri = {abs_path, "/ibrowse_303_with_body_test"}}) ->
      Resp = <<"HTTP/1.1 303 See Other\r\nLocation: http://example.org\r\nContent-Length: 5\r\n\r\nabcde">>,
      do_send(Sock, Sock_type, Resp);
 +process_request(Sock, Sock_type,
 +    #request{method='GET',
 +        headers = _Headers,
 +        uri = {abs_path, "/ibrowse_handle_one_request_only_with_delay"}}) ->
 +    timer:sleep(2000),
 +    Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>,
 +    do_send(Sock, Sock_type, Resp),
 +    close_connection;
 +process_request(Sock, Sock_type,
 +    #request{method='GET',
 +        headers = _Headers,
 +        uri = {abs_path, "/ibrowse_handle_one_request_only"}}) ->
 +    Resp = <<"HTTP/1.1 200 OK\r\nServer: Apache-Coyote/1.1\r\nDate: Wed, 04 Apr 2012 16:53:49 GMT\r\nConnection: close\r\n\r\n">>,
 +    do_send(Sock, Sock_type, Resp),
 +    close_connection;
+ process_request(_Sock, _Sock_type, #request{uri = {abs_path, "/never_respond"} } ) ->
+     not_done;
  process_request(Sock, Sock_type, Req) ->
      do_trace("Recvd req: ~p~n", [Req]),
      Resp = <<"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n">>,