You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ja...@apache.org on 2015/04/27 04:14:09 UTC

[1/4] trafficserver git commit: Fix KA check for chunked responses by tracking a conn-id

Repository: trafficserver
Updated Branches:
  refs/heads/master 778b952d9 -> e709f7c08


Fix KA check for chunked responses by tracking a conn-id


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/74ddc5e2
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/74ddc5e2
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/74ddc5e2

Branch: refs/heads/master
Commit: 74ddc5e2490d622258975cfe75a090336e596e30
Parents: 778b952
Author: Thomas Jackson <ja...@apache.org>
Authored: Fri Apr 24 08:29:45 2015 -0700
Committer: Thomas Jackson <ja...@apache.org>
Committed: Sun Apr 26 19:12:57 2015 -0700

----------------------------------------------------------------------
 ci/new_tsqa/tests/test_chunked.py | 34 ++++++++++++++++++++++------------
 1 file changed, 22 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/74ddc5e2/ci/new_tsqa/tests/test_chunked.py
----------------------------------------------------------------------
diff --git a/ci/new_tsqa/tests/test_chunked.py b/ci/new_tsqa/tests/test_chunked.py
index 8302869..16a2b77 100644
--- a/ci/new_tsqa/tests/test_chunked.py
+++ b/ci/new_tsqa/tests/test_chunked.py
@@ -23,6 +23,7 @@ import time
 import logging
 import json
 import threading
+import uuid
 
 import helpers
 
@@ -45,6 +46,7 @@ class ChunkedHandler(SocketServer.BaseRequestHandler):
 
     def handle(self):
         # Receive the data in small chunks and retransmit it
+        conn_id = uuid.uuid4().hex
         while True:
             data = self.request.recv(4096).strip()
             if data:
@@ -67,6 +69,7 @@ class ChunkedHandler(SocketServer.BaseRequestHandler):
                     close = json.loads(uri_parts[2])
 
             resp = ('HTTP/1.1 200 OK\r\n'
+                    'X-Conn-Id: ' + str(conn_id) + '\r\n'
                     'Transfer-Encoding: chunked\r\n'
                     '\r\n')
             self.request.sendall(resp)
@@ -79,8 +82,6 @@ class ChunkedHandler(SocketServer.BaseRequestHandler):
                 self.request.sendall('lkfjasd;lfjas;d')
 
             time.sleep(2)
-            self.request.close()
-            return
 
 class TestChunked(helpers.EnvironmentCase):
     @classmethod
@@ -110,16 +111,23 @@ class TestChunked(helpers.EnvironmentCase):
         '''
         Test that the origin does in fact support keepalive
         '''
-        url = 'http://127.0.0.1:{0}/'.format(self.port)
-        self.assertEqual(requests.get(url).text, '01234')
-
-        url = 'http://127.0.0.1:{0}/2'.format(self.port)
-        self.assertEqual(requests.get(url).text, '01')
-
-        url = 'http://127.0.0.1:{0}/2/1'.format(self.port)
-        start = time.time()
-        self.assertEqual(requests.get(url).text, '01')
-        self.assertTrue(time.time() - start > 2)
+        with requests.Session() as s:
+            url = 'http://127.0.0.1:{0}/'.format(self.port)
+            ret = s.get(url)
+            conn_id = ret.headers['x-conn-id']
+            self.assertEqual(ret.text, '01234')
+
+            url = 'http://127.0.0.1:{0}/2'.format(self.port)
+            ret = s.get(url)
+            self.assertEqual(ret.text, '01')
+            self.assertEqual(ret.headers['x-conn-id'], conn_id)
+
+            url = 'http://127.0.0.1:{0}/2/1'.format(self.port)
+            start = time.time()
+            ret = s.get(url)
+            self.assertEqual(ret.text, '01')
+            self.assertEqual(ret.headers['x-conn-id'], conn_id)
+            self.assertTrue(time.time() - start > 2)
 
     def test_chunked_basic(self):
         url = 'http://127.0.0.1:{0}'.format(self.port)
@@ -131,6 +139,7 @@ class TestChunked(helpers.EnvironmentCase):
     def test_chunked_keepalive(self):
         url = 'http://127.0.0.1:{0}'.format(self.port)
         ret = requests.get(url, proxies=self.proxies)
+        conn_id = ret.headers['x-conn-id']
         self.assertEqual(ret.status_code, 200)
         self.assertEqual(ret.text.strip(), '01234')
 
@@ -139,6 +148,7 @@ class TestChunked(helpers.EnvironmentCase):
         ret = requests.get(url, proxies=self.proxies)
         self.assertEqual(ret.status_code, 200)
         self.assertEqual(ret.text.strip(), '01234')
+        self.assertEqual(conn_id, ret.headers['x-conn-id'])
 
     def test_chunked_bad_close(self):
         url = 'http://127.0.0.1:{0}/5/0.1/false'.format(self.port)


[3/4] trafficserver git commit: Add tests for auth_server_session_private

Posted by ja...@apache.org.
Add tests for auth_server_session_private


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/e709f7c0
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/e709f7c0
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/e709f7c0

Branch: refs/heads/master
Commit: e709f7c08d0516fd03a9a906d11d6fa33245cb23
Parents: f54516c
Author: Thomas Jackson <ja...@gmail.com>
Authored: Fri Apr 24 19:29:33 2015 -0700
Committer: Thomas Jackson <ja...@apache.org>
Committed: Sun Apr 26 19:13:41 2015 -0700

----------------------------------------------------------------------
 ci/new_tsqa/tests/test_keepalive.py | 125 +++++++++++++++++++++++++++----
 1 file changed, 111 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e709f7c0/ci/new_tsqa/tests/test_keepalive.py
----------------------------------------------------------------------
diff --git a/ci/new_tsqa/tests/test_keepalive.py b/ci/new_tsqa/tests/test_keepalive.py
index c94d4d1..0e501ce 100644
--- a/ci/new_tsqa/tests/test_keepalive.py
+++ b/ci/new_tsqa/tests/test_keepalive.py
@@ -68,13 +68,41 @@ class KeepAliveInMixin(object):
         s.connect(('127.0.0.1', int(self.configs['records.config']['CONFIG']['proxy.config.http.server_ports'])))
         return s
 
-    def _aux_working_path(self, protocol):
+    def _headers_to_str(self, headers):
+        if headers is None:
+            headers = {}
+        request = ''
+        for k, v in headers.iteritems():
+            request += '{0}: {1}\r\n'.format(k, v)
+        return request
+
+    def _aux_KA_working_path_connid(self, protocol, headers=None):
+        # connect tcp
+        s = self._get_socket()
+
+        request = ('GET / HTTP/1.1\r\n'
+                   'Host: foobar.com\r\n')
+        request += self._headers_to_str(headers)
+        request += '\r\n'
+
+        for x in xrange(1, 10):
+            s.send(request)
+            response = s.recv(4096)
+            # cheat, since we know what the body should have
+            if '\r\n\r\n' not in response:
+                response += s.recv(4096)
+            self.assertIn('HTTP/1.1 200 OK', response)
+            self.assertIn('hello', response)
+
+    def _aux_working_path(self, protocol, headers=None):
         # connect tcp
         s = self._get_socket()
 
         request = ('GET /exists/ HTTP/1.1\r\n'
-                   'Host: foobar.com\r\n'
-                   '\r\n')
+                   'Host: foobar.com\r\n')
+        request += self._headers_to_str(headers)
+        request += '\r\n'
+
         for x in xrange(1, 10):
             s.send(request)
             response = s.recv(4096)
@@ -84,19 +112,20 @@ class KeepAliveInMixin(object):
             self.assertIn('HTTP/1.1 200 OK', response)
             self.assertIn('hello', response)
 
-    def _aux_error_path(self, protocol):
+    def _aux_error_path(self, protocol, headers=None):
         # connect tcp
         s = self._get_socket()
 
         request = ('GET / HTTP/1.1\r\n'
-                   'Host: foobar.com\r\n'
-                   '\r\n')
+                   'Host: foobar.com\r\n')
+        request += self._headers_to_str(headers)
+        request += '\r\n'
         for x in xrange(1, 10):
             s.send(request)
             response = s.recv(4096)
             self.assertIn('HTTP/1.1 404 Not Found on Accelerator', response)
 
-    def _aux_error_path_post(self, protocol):
+    def _aux_error_path_post(self, protocol, headers=None):
         '''
         Ensure that sending a request with a body doesn't break the keepalive session
         '''
@@ -105,9 +134,11 @@ class KeepAliveInMixin(object):
 
         request = ('POST / HTTP/1.1\r\n'
                    'Host: foobar.com\r\n'
-                   'Content-Length: 10\r\n'
-                   '\r\n'
-                   '1234567890')
+                   'Content-Length: 10\r\n')
+        request += self._headers_to_str(headers)
+        request += '\r\n'
+        request += '1234567890'
+
         for x in xrange(1, 10):
             try:
                 s.send(request)
@@ -123,7 +154,7 @@ class KeepAliveInMixin(object):
 
 class BasicTestsOutMixin(object):
 
-    def _aux_KA_origin(self, protocol):
+    def _aux_KA_origin(self, protocol, headers=None):
         '''
         Test that the origin does in fact support keepalive
         '''
@@ -131,13 +162,13 @@ class BasicTestsOutMixin(object):
         with requests.Session() as s:
             url = '{0}://127.0.0.1:{1}/'.format(protocol, self.socket_server.port)
             for x in xrange(1, 10):
-                ret = s.get(url, verify=False)
+                ret = s.get(url, verify=False, headers=headers)
                 if not conn_id:
                     conn_id = ret.text.strip()
                 self.assertEqual(ret.status_code, 200)
                 self.assertEqual(ret.text.strip(), conn_id, "Client reports server closed connection")
 
-    def _aux_KA_proxy(self, protocol):
+    def _aux_KA_proxy(self, protocol, headers=None):
         '''
         Test that keepalive works through ATS to that origin
         '''
@@ -145,7 +176,7 @@ class BasicTestsOutMixin(object):
             self.configs['records.config']['CONFIG']['proxy.config.http.server_ports'])
         conn_id = None
         for x in xrange(1, 10):
-            ret = requests.get(url, verify=False)
+            ret = requests.get(url, verify=False, headers=headers)
             if not conn_id:
               conn_id = ret.text.strip()
             self.assertEqual(ret.status_code, 200)
@@ -190,6 +221,7 @@ class OriginMinMaxMixin(object):
         ret = requests.get(url, verify=False)
         self.assertEqual(ret.text.strip(), conn_id, "Client reports server closed connection")
 
+
 class TestKeepAliveInHTTP(tsqa.test_cases.DynamicHTTPEndpointCase, helpers.EnvironmentCase, KeepAliveInMixin):
     @classmethod
     def setUpEnv(cls, env):
@@ -377,3 +409,68 @@ class TestKeepAliveOutHTTPS(helpers.EnvironmentCase, BasicTestsOutMixin, Timeout
     def test_KA_timeout_proxy(self):
         '''Tests that keepalive timeout is honored through ATS to origin via https.'''
         self._aux_KA_timeout_proxy("http")
+
+
+
+# TODO: refactor these tests, these are *very* similar, we should paramatarize them
+## Some basic tests for auth_sever_session_private
+class TestKeepAlive_Authorization_private(helpers.EnvironmentCase, BasicTestsOutMixin, KeepAliveInMixin):
+    @classmethod
+    def setUpEnv(cls, env):
+
+        cls.socket_server = tsqa.endpoint.SocketServerDaemon(KeepaliveTCPHandler)
+        cls.socket_server.start()
+        cls.socket_server.ready.wait()
+        cls.configs['remap.config'].add_line('map / http://127.0.0.1:{0}/exists/'.format(cls.socket_server.port))
+
+        # only add server headers when there weren't any
+        cls.configs['records.config']['CONFIG']['proxy.config.http.response_server_enabled'] = 2
+        cls.configs['records.config']['CONFIG']['proxy.config.http.keep_alive_enabled_in'] = 1
+        cls.configs['records.config']['CONFIG']['share_server_session'] = 2
+
+        # set only one ET_NET thread (so we don't have to worry about the per-thread pools causing issues)
+        cls.configs['records.config']['CONFIG']['proxy.config.exec_thread.limit'] = 1
+        cls.configs['records.config']['CONFIG']['proxy.config.exec_thread.autoconfig'] = 0
+
+        # make auth sessions private
+        cls.configs['records.config']['CONFIG']['proxy.config.auth_server_session_private'] = 1
+
+    def test_KA_server(self):
+        '''Tests that keepalive works through ATS to origin via https.'''
+        with self.assertRaises(AssertionError):
+            self._aux_KA_proxy("http", headers={'Authorization': 'Foo'})
+
+    def test_KA_client(self):
+        '''Tests that keepalive works through ATS to origin via https.'''
+        with self.assertRaises(AssertionError):
+            self._aux_KA_working_path_connid("http", headers={'Authorization': 'Foo'})
+
+
+class TestKeepAlive_Authorization_no_private(helpers.EnvironmentCase, BasicTestsOutMixin, KeepAliveInMixin):
+    @classmethod
+    def setUpEnv(cls, env):
+
+        cls.socket_server = tsqa.endpoint.SocketServerDaemon(KeepaliveTCPHandler)
+        cls.socket_server.start()
+        cls.socket_server.ready.wait()
+        cls.configs['remap.config'].add_line('map / http://127.0.0.1:{0}/exists/'.format(cls.socket_server.port))
+
+        # only add server headers when there weren't any
+        cls.configs['records.config']['CONFIG']['proxy.config.http.response_server_enabled'] = 2
+        cls.configs['records.config']['CONFIG']['proxy.config.http.keep_alive_enabled_in'] = 1
+        cls.configs['records.config']['CONFIG']['share_server_session'] = 2
+
+        # set only one ET_NET thread (so we don't have to worry about the per-thread pools causing issues)
+        cls.configs['records.config']['CONFIG']['proxy.config.exec_thread.limit'] = 1
+        cls.configs['records.config']['CONFIG']['proxy.config.exec_thread.autoconfig'] = 0
+
+        # make auth sessions private
+        cls.configs['records.config']['CONFIG']['proxy.config.http.auth_server_session_private'] = 0
+
+    def test_KA_server(self):
+        '''Tests that keepalive works through ATS to origin via https.'''
+        self._aux_KA_proxy("http", headers={'Authorization': 'Foo'})
+
+    def test_KA_client(self):
+        '''Tests that keepalive works through ATS to origin via https.'''
+        self._aux_KA_working_path_connid("http", headers={'Authorization': 'Foo'})


[2/4] trafficserver git commit: Add client side KA chunked test

Posted by ja...@apache.org.
Add client side KA chunked test


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/0d439bf4
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/0d439bf4
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/0d439bf4

Branch: refs/heads/master
Commit: 0d439bf4548e70938dcd33c5ba8aa8ce157a4c49
Parents: 74ddc5e
Author: Thomas Jackson <ja...@apache.org>
Authored: Fri Apr 24 08:56:10 2015 -0700
Committer: Thomas Jackson <ja...@apache.org>
Committed: Sun Apr 26 19:13:41 2015 -0700

----------------------------------------------------------------------
 ci/new_tsqa/tests/test_chunked.py | 38 +++++++++++++++++++++++++++-------
 1 file changed, 30 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/0d439bf4/ci/new_tsqa/tests/test_chunked.py
----------------------------------------------------------------------
diff --git a/ci/new_tsqa/tests/test_chunked.py b/ci/new_tsqa/tests/test_chunked.py
index 16a2b77..645a372 100644
--- a/ci/new_tsqa/tests/test_chunked.py
+++ b/ci/new_tsqa/tests/test_chunked.py
@@ -24,6 +24,7 @@ import logging
 import json
 import threading
 import uuid
+import socket
 
 import helpers
 
@@ -99,13 +100,15 @@ class TestChunked(helpers.EnvironmentCase):
         t.start()
         cls.configs['remap.config'].add_line('map / http://127.0.0.1:{0}/'.format(cls.port))
 
-        cls.configs['records.config']['CONFIG']['proxy.config.http.connect_attempts_timeout'] = 5
-        cls.configs['records.config']['CONFIG']['proxy.config.http.connect_attempts_max_retries'] = 0
-
-        cls.configs['records.config']['CONFIG']['proxy.config.http.keep_alive_enabled_in'] = 1
-        cls.configs['records.config']['CONFIG']['proxy.config.http.keep_alive_enabled_out'] = 0
-        cls.configs['records.config']['CONFIG']['proxy.config.exec_thread.limit'] = 1
-        cls.configs['records.config']['CONFIG']['proxy.config.exec_thread.autoconfig'] = 0
+        cls.configs['records.config']['CONFIG'].update({
+            'proxy.config.http.connect_attempts_timeout': 5,
+            'proxy.config.http.connect_attempts_max_retries': 0,
+            'proxy.config.http.keep_alive_enabled_in': 1,
+            'proxy.config.http.keep_alive_enabled_out': 1,
+            'proxy.config.exec_thread.limit': 1,
+            'proxy.config.exec_thread.autoconfig': 0,
+            'proxy.config.http.chunking_enabled': 1,
+        })
 
     def test_chunked_origin(self):
         '''
@@ -136,7 +139,7 @@ class TestChunked(helpers.EnvironmentCase):
         self.assertEqual(ret.text.strip(), '01234')
 
     # TODO: fix keepalive with chunked responses
-    def test_chunked_keepalive(self):
+    def test_chunked_keepalive_server(self):
         url = 'http://127.0.0.1:{0}'.format(self.port)
         ret = requests.get(url, proxies=self.proxies)
         conn_id = ret.headers['x-conn-id']
@@ -150,6 +153,25 @@ class TestChunked(helpers.EnvironmentCase):
         self.assertEqual(ret.text.strip(), '01234')
         self.assertEqual(conn_id, ret.headers['x-conn-id'])
 
+    def test_chunked_keepalive_client(self):
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s.connect(('127.0.0.1', int(self.configs['records.config']['CONFIG']['proxy.config.http.server_ports'])))
+
+        request = ('GET / HTTP/1.1\r\n'
+                   'Host: 127.0.0.1\r\n'
+                   '\r\n')
+        for x in xrange(1, 10):
+            s.send(request)
+            resp = ''
+            while True:
+                response = s.recv(4096)
+                if '0\r\n\r\n' in response:
+                    break
+                else:
+                    resp += response
+            for x in xrange(0, 4):
+                self.assertIn('1\r\n{0}\r\n'.format(x), resp)
+
     def test_chunked_bad_close(self):
         url = 'http://127.0.0.1:{0}/5/0.1/false'.format(self.port)
         with self.assertRaises(requests.exceptions.ConnectionError):


[4/4] trafficserver git commit: Fix chunked keep-alive tests

Posted by ja...@apache.org.
Fix chunked keep-alive tests


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/f54516c7
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/f54516c7
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/f54516c7

Branch: refs/heads/master
Commit: f54516c7b3d1edc16a68681eb0f31c97f8404625
Parents: 0d439bf
Author: Thomas Jackson <ja...@gmail.com>
Authored: Fri Apr 24 19:00:36 2015 -0700
Committer: Thomas Jackson <ja...@apache.org>
Committed: Sun Apr 26 19:13:41 2015 -0700

----------------------------------------------------------------------
 ci/new_tsqa/tests/test_chunked.py | 102 +++++++++++++++++++--------------
 1 file changed, 59 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f54516c7/ci/new_tsqa/tests/test_chunked.py
----------------------------------------------------------------------
diff --git a/ci/new_tsqa/tests/test_chunked.py b/ci/new_tsqa/tests/test_chunked.py
index 645a372..9b7fdc0 100644
--- a/ci/new_tsqa/tests/test_chunked.py
+++ b/ci/new_tsqa/tests/test_chunked.py
@@ -22,7 +22,6 @@ import requests
 import time
 import logging
 import json
-import threading
 import uuid
 import socket
 
@@ -56,7 +55,10 @@ class ChunkedHandler(SocketServer.BaseRequestHandler):
                 log.info('Client disconnected')
                 break
             inc_lines = data.splitlines()
-            uri = inc_lines[0].split()[1]
+            try:
+                uri = inc_lines[0].split()[1]
+            except IndexError:
+                break
             parts = 5  # how many things to send
             sleep_time = 0.2  # how long to sleep between parts
             close = True  # whether to close properly
@@ -68,10 +70,10 @@ class ChunkedHandler(SocketServer.BaseRequestHandler):
                     sleep_time = float(uri_parts[1])
                 if len(uri_parts) >= 3:
                     close = json.loads(uri_parts[2])
-
             resp = ('HTTP/1.1 200 OK\r\n'
                     'X-Conn-Id: ' + str(conn_id) + '\r\n'
                     'Transfer-Encoding: chunked\r\n'
+                    'Connection: keep-alive\r\n'
                     '\r\n')
             self.request.sendall(resp)
             for x in xrange(0, parts):
@@ -94,10 +96,10 @@ class TestChunked(helpers.EnvironmentCase):
 
         # create a socket server
         cls.port = tsqa.utils.bind_unused_port()[1]
-        server = SocketServer.TCPServer(('127.0.0.1', cls.port), ChunkedHandler)
-        t = threading.Thread(target=server.serve_forever)
-        t.daemon = True
-        t.start()
+        cls.server = tsqa.endpoint.SocketServerDaemon(ChunkedHandler, port=cls.port)
+        cls.server.start()
+        cls.server.ready.wait()
+
         cls.configs['remap.config'].add_line('map / http://127.0.0.1:{0}/'.format(cls.port))
 
         cls.configs['records.config']['CONFIG'].update({
@@ -114,23 +116,53 @@ class TestChunked(helpers.EnvironmentCase):
         '''
         Test that the origin does in fact support keepalive
         '''
-        with requests.Session() as s:
-            url = 'http://127.0.0.1:{0}/'.format(self.port)
-            ret = s.get(url)
-            conn_id = ret.headers['x-conn-id']
-            self.assertEqual(ret.text, '01234')
-
-            url = 'http://127.0.0.1:{0}/2'.format(self.port)
-            ret = s.get(url)
-            self.assertEqual(ret.text, '01')
-            self.assertEqual(ret.headers['x-conn-id'], conn_id)
-
-            url = 'http://127.0.0.1:{0}/2/1'.format(self.port)
-            start = time.time()
-            ret = s.get(url)
-            self.assertEqual(ret.text, '01')
-            self.assertEqual(ret.headers['x-conn-id'], conn_id)
-            self.assertTrue(time.time() - start > 2)
+        self._client_test_chunked_keepalive(self.port)
+        self._client_test_chunked_keepalive(self.port, num_bytes=2)
+        self._client_test_chunked_keepalive(self.port, num_bytes=2, sleep=1)
+
+    def _client_test_chunked_keepalive(self,
+                                       port=None,
+                                       times=3,
+                                       num_bytes=None,
+                                       sleep=None,
+                                       ):
+        if port is None:
+            port = int(self.configs['records.config']['CONFIG']['proxy.config.http.server_ports'])
+        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        s.connect(('127.0.0.1', port))
+
+        url = '/'
+        if num_bytes is not None:
+            url += str(num_bytes)
+        if sleep is not None:
+            if num_bytes is None:
+                raise Exception()
+            url += '/' + str(sleep)
+
+        request = ('GET ' + url + ' HTTP/1.1\r\n'
+                   'Host: 127.0.0.1\r\n'
+                   '\r\n')
+        uuid = None
+        # test basic
+        for x in xrange(1, times):
+            s.send(request)
+            resp = ''
+            while True:
+                response = s.recv(4096)
+                for line in response.splitlines():
+                    line = line.strip()
+                    if line.startswith('X-Conn-Id:'):
+                        r_uuid = line.replace('X-Conn-Id:', '')
+                        if uuid is None:
+                            uuid = r_uuid
+                        else:
+                            self.assertEqual(uuid, r_uuid)
+                resp += response
+                if resp.endswith('\r\n0\r\n\r\n'):
+                    break
+            for x in xrange(0, num_bytes or 4):
+                self.assertIn('1\r\n{0}\r\n'.format(x), resp)
+        s.close()
 
     def test_chunked_basic(self):
         url = 'http://127.0.0.1:{0}'.format(self.port)
@@ -154,27 +186,11 @@ class TestChunked(helpers.EnvironmentCase):
         self.assertEqual(conn_id, ret.headers['x-conn-id'])
 
     def test_chunked_keepalive_client(self):
-        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        s.connect(('127.0.0.1', int(self.configs['records.config']['CONFIG']['proxy.config.http.server_ports'])))
-
-        request = ('GET / HTTP/1.1\r\n'
-                   'Host: 127.0.0.1\r\n'
-                   '\r\n')
-        for x in xrange(1, 10):
-            s.send(request)
-            resp = ''
-            while True:
-                response = s.recv(4096)
-                if '0\r\n\r\n' in response:
-                    break
-                else:
-                    resp += response
-            for x in xrange(0, 4):
-                self.assertIn('1\r\n{0}\r\n'.format(x), resp)
+        self._client_test_chunked_keepalive()
+        self._client_test_chunked_keepalive(num_bytes=2)
+        self._client_test_chunked_keepalive(num_bytes=2, sleep=1)
 
     def test_chunked_bad_close(self):
         url = 'http://127.0.0.1:{0}/5/0.1/false'.format(self.port)
         with self.assertRaises(requests.exceptions.ConnectionError):
             ret = requests.get(url, proxies=self.proxies, timeout=2)
-
-