You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2012/05/18 23:49:16 UTC
svn commit: r1340271 - in /avro/trunk: BUILD.txt CHANGES.txt
lang/py/setup.py lang/py/src/avro/ipc.py lang/py/test/test_ipc.py
Author: cutting
Date: Fri May 18 21:49:15 2012
New Revision: 1340271
URL: http://svn.apache.org/viewvc?rev=1340271&view=rev
Log:
AVRO-1028. Python: Fix HTTP server to handle connection resets and redirects. Contributed by Bo Shi.
Modified:
avro/trunk/BUILD.txt
avro/trunk/CHANGES.txt
avro/trunk/lang/py/setup.py
avro/trunk/lang/py/src/avro/ipc.py
avro/trunk/lang/py/test/test_ipc.py
Modified: avro/trunk/BUILD.txt
URL: http://svn.apache.org/viewvc/avro/trunk/BUILD.txt?rev=1340271&r1=1340270&r2=1340271&view=diff
==============================================================================
--- avro/trunk/BUILD.txt (original)
+++ avro/trunk/BUILD.txt Fri May 18 21:49:15 2012
@@ -6,7 +6,7 @@ The following packages must be installed
- Java: JDK 1.6, Maven 2 or better, protobuf-compile
- PHP: php5, phpunit, php5-gmp
- - Python: 2.5 or greater, python-setuptools for dist target
+ - Python: 2.5 or greater, urllib3, python-setuptools for dist target
- C: gcc, cmake, asciidoc, source-highlight
- C++: cmake 2.8.4 or greater, g++, flex, bison, libboost-dev
- Ruby: ruby 1.86 or greater, ruby-dev, gem, rake, echoe, yajl-ruby
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1340271&r1=1340270&r2=1340271&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri May 18 21:49:15 2012
@@ -43,6 +43,9 @@ Avro 1.7.0 (unreleased)
AVRO-1050. PHP: Optimize memory use by string append. (A B via cutting)
+ AVRO-1028. Python: Fix HTTP server to handle connection resets and
+ redirects. (Bo Shi via cutting)
+
BUG FIXES
AVRO-1045. Java: Fix a bug in GenericData#deepCopy() of ByteBuffer values.
Modified: avro/trunk/lang/py/setup.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/setup.py?rev=1340271&r1=1340270&r2=1340271&view=diff
==============================================================================
--- avro/trunk/lang/py/setup.py (original)
+++ avro/trunk/lang/py/setup.py Fri May 18 21:49:15 2012
@@ -24,7 +24,7 @@ from sys import version_info
if version_info[:2] > (2, 5):
install_requires = ['python-snappy']
else:
- install_requires = ['python-snappy', 'simplejson >= 2.0.9']
+ install_requires = ['python-snappy', 'simplejson >= 2.0.9', 'urllib3']
setup(
name = 'avro',
Modified: avro/trunk/lang/py/src/avro/ipc.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ipc.py?rev=1340271&r1=1340270&r2=1340271&view=diff
==============================================================================
--- avro/trunk/lang/py/src/avro/ipc.py (original)
+++ avro/trunk/lang/py/src/avro/ipc.py Fri May 18 21:49:15 2012
@@ -16,7 +16,8 @@
"""
Support for inter-process calls.
"""
-import httplib
+import uuid
+from urllib3.connectionpool import HTTPConnectionPool
try:
from cStringIO import StringIO
except ImportError:
@@ -439,45 +440,57 @@ class HTTPTransceiver(object):
A simple HTTP-based transceiver implementation.
Useful for clients but not for servers
"""
- def __init__(self, host, port, req_resource='/'):
+ def __init__(self, host, port, req_resource='/', remote_name=None,
+ timeout=None, redirect=True, max_pool_size=1, block=1):
+ """
+ The following parameters set behavior for the underlying connection pool
+ for this transceiver.
+
+ :param redirect:
+ Automatically handle redirects (status codes 301, 302, 303, 307), each
+ redirect counts as a retry.
+
+ :param timeout:
+ Socket timeout for each individual connection, can be a float. None
+ disables timeout.
+
+ :param maxsize:
+ Number of connections to save that can be reused. More than 1 is useful
+ in multithreaded situations. If ``block`` is set to false, more
+ connections will be created but they will not be saved once they've been
+ used.
+
+ :param block:
+ If set to True, no more than ``maxsize`` connections will be used at a
+ time. When no free connections are available, the call will block until a
+ connection has been released. This is a useful side effect for particular
+ multithreaded situations where one does not want to use more than maxsize
+ connections per host to prevent flooding.
+ """
self.req_resource = req_resource
- self.conn = httplib.HTTPConnection(host, port)
- self.conn.connect()
+ self._remote_name = remote_name or uuid.uuid4()
+ self._pool = HTTPConnectionPool(host, port=port, timeout=timeout,
+ maxsize=max_pool_size, block=block)
+ self._redirect = redirect
# read-only properties
- sock = property(lambda self: self.conn.sock)
- remote_name = property(lambda self: self.sock.getsockname())
-
- # read/write properties
- def set_conn(self, new_conn):
- self._conn = new_conn
- conn = property(lambda self: self._conn, set_conn)
- req_resource = '/'
+ pool = property(lambda self: self._pool)
+ remote_name = property(lambda self: self._remote_name)
+ redirect = property(lambda self: self._redirect)
def transceive(self, request):
- self.write_framed_message(request)
- result = self.read_framed_message()
- return result
-
- def read_framed_message(self):
- response = self.conn.getresponse()
- response_reader = FramedReader(response)
- framed_message = response_reader.read_framed_message()
- response.read() # ensure we're ready for subsequent requests
- return framed_message
-
- def write_framed_message(self, message):
req_method = 'POST'
req_headers = {'Content-Type': 'avro/binary'}
req_body_buffer = FramedWriter(StringIO())
- req_body_buffer.write_framed_message(message)
- req_body = req_body_buffer.writer.getvalue()
+ req_body_buffer.write_framed_message(request)
+ response = self.pool.urlopen(req_method, self.req_resource,
+ body=req_body_buffer.writer.getvalue(),
+ headers=req_headers,
+ redirect=self.redirect)
- self.conn.request(req_method, self.req_resource, req_body, req_headers)
+ return FramedReader(StringIO(response.data)).read_framed_message()
- def close(self):
- self.conn.close()
#
# Server Implementations (none yet)
Modified: avro/trunk/lang/py/test/test_ipc.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_ipc.py?rev=1340271&r1=1340270&r2=1340271&view=diff
==============================================================================
--- avro/trunk/lang/py/test/test_ipc.py (original)
+++ avro/trunk/lang/py/test/test_ipc.py Fri May 18 21:49:15 2012
@@ -17,18 +17,62 @@
There are currently no IPC tests within python, in part because there are no
servers yet available.
"""
+import time
import unittest
+from multiprocessing import Process
# This test does import this code, to make sure it at least passes
# compilation.
-from avro import ipc
+from avro import ipc, txipc
+from twisted.web import server
+from twisted.internet import reactor
+from txsample_http_server import MailResponder, MAIL_PROTOCOL
+
+
+def test_twisted_server():
+ root = server.Site(txipc.AvroResponderResource(MailResponder()))
+ reactor.listenTCP(9097, root)
+ reactor.run()
+
+
+class TestIPCClient(unittest.TestCase):
+ def setUp(self):
+ self.testserver = Process(target=test_twisted_server)
+ self.testserver.start()
+ # Is there a better way to wait until the server is ready to accept
+ # connections?
+ time.sleep(1)
+
+ def tearDown(self):
+ self.testserver.terminate()
+
+ def test_reconnect(self):
+ message = {
+ 'to': 'john@bar.com',
+ 'from': 'jane@baz.org',
+ 'body': 'hello world',
+ }
+
+ client = ipc.HTTPTransceiver('localhost', 9097)
+ requestor = ipc.Requestor(MAIL_PROTOCOL, client)
+
+ expected = u'Sent message to john@bar.com from jane@baz.org with body hello world'
+ params = {'message': message}
+ for msg_count in range(1):
+ self.assertEqual(expected, requestor.request('send', params))
+ self.tearDown()
+ self.setUp()
+ time.sleep(1)
+ for msg_count in range(2):
+ self.assertEqual(expected, requestor.request('send', params))
+
class TestIPC(unittest.TestCase):
def test_placeholder(self):
pass
def test_server_with_path(self):
- client_with_custom_path = ipc.HTTPTransceiver('dummyserver.net', 80, '/service/article')
+ client_with_custom_path = ipc.HTTPTransceiver('dummyserver.net', 80, req_resource='/service/article')
self.assertEqual('/service/article', client_with_custom_path.req_resource)
client_with_default_path = ipc.HTTPTransceiver('dummyserver.net', 80)