You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2012/05/18 23:49:16 UTC

svn commit: r1340271 - in /avro/trunk: BUILD.txt CHANGES.txt lang/py/setup.py lang/py/src/avro/ipc.py lang/py/test/test_ipc.py

Author: cutting
Date: Fri May 18 21:49:15 2012
New Revision: 1340271

URL: http://svn.apache.org/viewvc?rev=1340271&view=rev
Log:
AVRO-1028. Python: Fix HTTP server to handle connection resets and redirects.  Contributed by Bo Shi.

Modified:
    avro/trunk/BUILD.txt
    avro/trunk/CHANGES.txt
    avro/trunk/lang/py/setup.py
    avro/trunk/lang/py/src/avro/ipc.py
    avro/trunk/lang/py/test/test_ipc.py

Modified: avro/trunk/BUILD.txt
URL: http://svn.apache.org/viewvc/avro/trunk/BUILD.txt?rev=1340271&r1=1340270&r2=1340271&view=diff
==============================================================================
--- avro/trunk/BUILD.txt (original)
+++ avro/trunk/BUILD.txt Fri May 18 21:49:15 2012
@@ -6,7 +6,7 @@ The following packages must be installed
 
  - Java: JDK 1.6, Maven 2 or better, protobuf-compile
  - PHP: php5, phpunit, php5-gmp
- - Python: 2.5 or greater, python-setuptools for dist target
+ - Python: 2.5 or greater, urllib3, python-setuptools for dist target
  - C: gcc, cmake, asciidoc, source-highlight
  - C++: cmake 2.8.4 or greater, g++, flex, bison, libboost-dev
  - Ruby: ruby 1.86 or greater, ruby-dev, gem, rake, echoe, yajl-ruby

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1340271&r1=1340270&r2=1340271&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri May 18 21:49:15 2012
@@ -43,6 +43,9 @@ Avro 1.7.0 (unreleased)
 
     AVRO-1050. PHP: Optimize memory use by string append. (A B via cutting)
 
+    AVRO-1028. Python: Fix HTTP server to handle connection resets and
+    redirects.  (Bo Shi via cutting)
+
   BUG FIXES
 
     AVRO-1045. Java: Fix a bug in GenericData#deepCopy() of ByteBuffer values.

Modified: avro/trunk/lang/py/setup.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/setup.py?rev=1340271&r1=1340270&r2=1340271&view=diff
==============================================================================
--- avro/trunk/lang/py/setup.py (original)
+++ avro/trunk/lang/py/setup.py Fri May 18 21:49:15 2012
@@ -24,7 +24,7 @@ from sys import version_info
 if version_info[:2] > (2, 5):
     install_requires = ['python-snappy']
 else:
-    install_requires = ['python-snappy', 'simplejson >= 2.0.9']
+    install_requires = ['python-snappy', 'simplejson >= 2.0.9', 'urllib3']
 
 setup(
   name = 'avro',

Modified: avro/trunk/lang/py/src/avro/ipc.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/src/avro/ipc.py?rev=1340271&r1=1340270&r2=1340271&view=diff
==============================================================================
--- avro/trunk/lang/py/src/avro/ipc.py (original)
+++ avro/trunk/lang/py/src/avro/ipc.py Fri May 18 21:49:15 2012
@@ -16,7 +16,8 @@
 """
 Support for inter-process calls.
 """
-import httplib
+import uuid
+from urllib3.connectionpool import HTTPConnectionPool
 try:
   from cStringIO import StringIO
 except ImportError:
@@ -439,45 +440,57 @@ class HTTPTransceiver(object):
   A simple HTTP-based transceiver implementation.
   Useful for clients but not for servers
   """
-  def __init__(self, host, port, req_resource='/'):
+  def __init__(self, host, port, req_resource='/', remote_name=None,
+               timeout=None, redirect=True, max_pool_size=1, block=1):
+    """
+    The following parameters set behavior for the underlying connection pool
+    for this transceiver.
+
+    :param redirect:
+      Automatically handle redirects (status codes 301, 302, 303, 307), each
+      redirect counts as a retry.
+
+    :param timeout:
+      Socket timeout for each individual connection, can be a float. None
+      disables timeout.
+
+    :param maxsize:
+      Number of connections to save that can be reused. More than 1 is useful
+      in multithreaded situations. If ``block`` is set to false, more
+      connections will be created but they will not be saved once they've been
+      used.
+
+    :param block:
+      If set to True, no more than ``maxsize`` connections will be used at a
+      time. When no free connections are available, the call will block until a
+      connection has been released. This is a useful side effect for particular
+      multithreaded situations where one does not want to use more than maxsize
+      connections per host to prevent flooding.
+    """
     self.req_resource = req_resource
-    self.conn = httplib.HTTPConnection(host, port)
-    self.conn.connect()
+    self._remote_name = remote_name or uuid.uuid4()
+    self._pool = HTTPConnectionPool(host, port=port, timeout=timeout,
+        maxsize=max_pool_size, block=block)
+    self._redirect = redirect
 
   # read-only properties
-  sock = property(lambda self: self.conn.sock)
-  remote_name = property(lambda self: self.sock.getsockname())
-
-  # read/write properties
-  def set_conn(self, new_conn):
-    self._conn = new_conn
-  conn = property(lambda self: self._conn, set_conn)
-  req_resource = '/'
+  pool = property(lambda self: self._pool)
+  remote_name = property(lambda self: self._remote_name)
+  redirect = property(lambda self: self._redirect)
 
   def transceive(self, request):
-    self.write_framed_message(request)
-    result = self.read_framed_message()
-    return result
-
-  def read_framed_message(self):
-    response = self.conn.getresponse()
-    response_reader = FramedReader(response)
-    framed_message = response_reader.read_framed_message()
-    response.read()    # ensure we're ready for subsequent requests
-    return framed_message
-
-  def write_framed_message(self, message):
     req_method = 'POST'
     req_headers = {'Content-Type': 'avro/binary'}
 
     req_body_buffer = FramedWriter(StringIO())
-    req_body_buffer.write_framed_message(message)
-    req_body = req_body_buffer.writer.getvalue()
+    req_body_buffer.write_framed_message(request)
+    response = self.pool.urlopen(req_method, self.req_resource,
+                                 body=req_body_buffer.writer.getvalue(),
+                                 headers=req_headers,
+                                 redirect=self.redirect)
 
-    self.conn.request(req_method, self.req_resource, req_body, req_headers)
+    return FramedReader(StringIO(response.data)).read_framed_message()
 
-  def close(self):
-    self.conn.close()
 
 #
 # Server Implementations (none yet)

Modified: avro/trunk/lang/py/test/test_ipc.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py/test/test_ipc.py?rev=1340271&r1=1340270&r2=1340271&view=diff
==============================================================================
--- avro/trunk/lang/py/test/test_ipc.py (original)
+++ avro/trunk/lang/py/test/test_ipc.py Fri May 18 21:49:15 2012
@@ -17,18 +17,62 @@
 There are currently no IPC tests within python, in part because there are no
 servers yet available.
 """
+import time
 import unittest
+from multiprocessing import Process
 
 # This test does import this code, to make sure it at least passes
 # compilation.
-from avro import ipc
+from avro import ipc, txipc
+from twisted.web import server
+from twisted.internet import reactor
+from txsample_http_server import MailResponder, MAIL_PROTOCOL
+
+
+def test_twisted_server():
+  root = server.Site(txipc.AvroResponderResource(MailResponder()))
+  reactor.listenTCP(9097, root)
+  reactor.run()
+
+
+class TestIPCClient(unittest.TestCase):
+  def setUp(self):
+    self.testserver = Process(target=test_twisted_server)
+    self.testserver.start()
+    # Is there a better way to wait until the server is ready to accept
+    # connections?
+    time.sleep(1)
+
+  def tearDown(self):
+    self.testserver.terminate()
+
+  def test_reconnect(self):
+    message = {
+      'to': 'john@bar.com',
+      'from': 'jane@baz.org',
+      'body': 'hello world',
+    }
+
+    client = ipc.HTTPTransceiver('localhost', 9097)
+    requestor = ipc.Requestor(MAIL_PROTOCOL, client)
+
+    expected = u'Sent message to john@bar.com from jane@baz.org with body hello world'
+    params = {'message': message}
+    for msg_count in range(1):
+      self.assertEqual(expected, requestor.request('send', params))
+    self.tearDown()
+    self.setUp()
+    time.sleep(1)
+    for msg_count in range(2):
+      self.assertEqual(expected, requestor.request('send', params))
+
 
 class TestIPC(unittest.TestCase):
   def test_placeholder(self):
     pass
 
   def test_server_with_path(self):
-    client_with_custom_path = ipc.HTTPTransceiver('dummyserver.net', 80, '/service/article')
+    client_with_custom_path = ipc.HTTPTransceiver('dummyserver.net', 80, req_resource='/service/article')
     self.assertEqual('/service/article', client_with_custom_path.req_resource)
 
     client_with_default_path = ipc.HTTPTransceiver('dummyserver.net', 80)