You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/06/03 23:45:58 UTC

svn commit: r1683432 - in /qpid/trunk/qpid: python/qpid/client.py python/qpid/peer.py python/qpid/testlib.py tests/src/py/qpid_tests/broker_0_9/queue.py

Author: kwall
Date: Wed Jun  3 21:45:58 2015
New Revision: 1683432

URL: http://svn.apache.org/r1683432
Log:
QPID-6567: [Python Client 0-8..0-91] Support producer side flow control in the Python client

* Like the Qpid Java Client, this implementation does not send channel.flow-ok.

Work by Lorenz Quack <qu...@gmail.com>

Modified:
    qpid/trunk/qpid/python/qpid/client.py
    qpid/trunk/qpid/python/qpid/peer.py
    qpid/trunk/qpid/python/qpid/testlib.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py

Modified: qpid/trunk/qpid/python/qpid/client.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/client.py?rev=1683432&r1=1683431&r2=1683432&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/client.py (original)
+++ qpid/trunk/qpid/python/qpid/client.py Wed Jun  3 21:45:58 2015
@@ -83,7 +83,8 @@ class Client:
 
   def start(self, response=None, mechanism=None, locale="en_US", tune_params=None,
             username=None, password=None,
-            client_properties=None, connection_options=None, sasl_options = None):
+            client_properties=None, connection_options=None, sasl_options = None,
+            channel_options=None):
     self.mechanism = mechanism
     self.response = response
     self.username = username
@@ -94,7 +95,7 @@ class Client:
     self.sasl_options = sasl_options
     self.socket = connect(self.host, self.port, connection_options)
     self.conn = Connection(self.socket, self.spec)
-    self.peer = Peer(self.conn, ClientDelegate(self), Session)
+    self.peer = Peer(self.conn, ClientDelegate(self), Session, channel_options)
 
     self.conn.init()
     self.peer.start()
@@ -206,6 +207,9 @@ class ClientDelegate(Delegate):
   def channel_close(self, ch, msg):
     ch.closed(msg)
 
+  def channel_flow(self, ch, msg):
+    ch.set_flow_control(not msg.active)
+
   def session_ack(self, ch, msg):
     pass
 

Modified: qpid/trunk/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/peer.py?rev=1683432&r1=1683431&r2=1683432&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/peer.py (original)
+++ qpid/trunk/qpid/python/qpid/peer.py Wed Jun  3 21:45:58 2015
@@ -31,7 +31,7 @@ from queue import Queue, Closed as Queue
 from content import Content
 from cStringIO import StringIO
 from time import time
-from exceptions import Closed
+from exceptions import Closed, Timeout
 from logging import getLogger
 
 log = getLogger("qpid.peer")
@@ -55,7 +55,7 @@ class Sequence:
 
 class Peer:
 
-  def __init__(self, conn, delegate, channel_factory=None):
+  def __init__(self, conn, delegate, channel_factory=None, channel_options=None):
     self.conn = conn
     self.delegate = delegate
     self.outgoing = Queue(0)
@@ -66,6 +66,9 @@ class Peer:
       self.channel_factory = channel_factory
     else:
       self.channel_factory = Channel
+    if channel_options is None:
+      channel_options = {}
+    self.channel_options = channel_options
 
   def channel(self, id):
     self.lock.acquire()
@@ -73,7 +76,7 @@ class Peer:
       try:
         ch = self.channels[id]
       except KeyError:
-        ch = self.channel_factory(id, self.outgoing, self.conn.spec)
+        ch = self.channel_factory(id, self.outgoing, self.conn.spec, self.channel_options)
         self.channels[id] = ch
     finally:
       self.lock.release()
@@ -205,7 +208,7 @@ class Responder:
 
 class Channel:
 
-  def __init__(self, id, outgoing, spec):
+  def __init__(self, id, outgoing, spec, options):
     self.id = id
     self.outgoing = outgoing
     self.spec = spec
@@ -228,6 +231,10 @@ class Channel:
     self.use_execution_layer = (spec.major == 0 and spec.minor == 10) or (spec.major == 99 and spec.minor == 0)
     self.synchronous = True
 
+    self._flow_control_wait_failure = options.get("qpid.flow_control_wait_failure", 60)
+    self._flow_control_wc = threading.Condition()
+    self._flow_control = False
+
   def closed(self, reason):
     if self._closed:
       return
@@ -339,6 +346,8 @@ class Channel:
       future = Future()
       self.futures[cmd_id] = future
 
+    if frame.method.klass.name == "basic" and frame.method.name == "publish":
+      self.check_flow_control()
     self.write(frame, content)
 
     try:
@@ -381,6 +390,24 @@ class Channel:
       else:
         raise e
 
+  # part of flow control for AMQP 0-8, 0-9, and 0-9-1
+  def set_flow_control(self, value):
+    self._flow_control_wc.acquire()
+    self._flow_control = value
+    if value == False:
+      self._flow_control_wc.notify()
+    self._flow_control_wc.release()
+
+  # part of flow control for AMQP 0-8, 0-9, and 0-9-1
+  def check_flow_control(self):
+    self._flow_control_wc.acquire()
+    if self._flow_control:
+      self._flow_control_wc.wait(self._flow_control_wait_failure)
+    if self._flow_control:
+      self._flow_control_wc.release()
+      raise Timeout("Unable to send message for " + str(self._flow_control_wait_failure) + " seconds due to broker enforced flow control")
+    self._flow_control_wc.release()
+
   def __getattr__(self, name):
     type = self.spec.method(name)
     if type == None: raise AttributeError(name)

Modified: qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/testlib.py?rev=1683432&r1=1683431&r2=1683432&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ qpid/trunk/qpid/python/qpid/testlib.py Wed Jun  3 21:45:58 2015
@@ -73,7 +73,7 @@ class TestBase(unittest.TestCase):
 
         self.client.close()
 
-    def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None):
+    def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None, channel_options=None):
         """Create a new connction, return the Client object"""
         host = host or self.config.broker.host
         port = port or self.config.broker.port or 5672
@@ -81,7 +81,7 @@ class TestBase(unittest.TestCase):
         password = password or self.config.broker.password or "guest"
         client = qpid.client.Client(host, port)
         try:
-          client.start(username = user, password=password, tune_params=tune_params, client_properties=client_properties)
+          client.start(username = user, password=password, tune_params=tune_params, client_properties=client_properties, channel_options=channel_options)
         except qpid.client.Closed, e:
             if isinstance(e.args[0], VersionError):
                 raise Skipped(e.args[0])

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py?rev=1683432&r1=1683431&r2=1683432&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py Wed Jun  3 21:45:58 2015
@@ -16,10 +16,12 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+import time
 from qpid.client import Client, Closed
 from qpid.queue import Empty
 from qpid.content import Content
 from qpid.testlib import TestBase
+from qpid.exceptions import Timeout
 
 class QueueTests(TestBase):
     """Tests for 'methods' on the amqp queue 'class'"""
@@ -109,3 +111,38 @@ class QueueTests(TestBase):
             self.fail("Expected queue to have been deleted")
         except Closed, e:
             self.assertChannelException(404, e.args[0])
+
+    def test_flow_control(self):
+        queue_name="flow-controled-queue"
+
+        connection = self.connect(channel_options={"qpid.flow_control_wait_failure" : 1})
+        channel = connection.channel(1)
+        channel.channel_open()
+        channel.queue_declare(queue=queue_name, arguments={"x-qpid-capacity" : 25, "x-qpid-flow-resume-capacity" : 15})
+
+        try:
+            for i in xrange(100):
+                channel.basic_publish(exchange="", routing_key=queue_name,
+                                      content=Content("This is a message with more than 25 bytes. This should trigger flow control."))
+                time.sleep(.1)
+            self.fail("Flow Control did not work")
+        except Timeout:
+            # this is expected
+            pass
+
+        consumer_reply = channel.basic_consume(queue=queue_name, consumer_tag="consumer", no_ack=True)
+        queue = self.client.queue(consumer_reply.consumer_tag)
+        while True:
+            try:
+                msg = queue.get(timeout=1)
+            except Empty:
+                break
+        channel.basic_cancel(consumer_tag=consumer_reply.consumer_tag)
+
+        try:
+            channel.basic_publish(exchange="", routing_key=queue_name,
+                                  content=Content("This should not block because we have just cleared the queue."))
+        except Timeout:
+            self.fail("Unexpected Timeout. Flow Control should not be in effect.")
+
+        connection.close()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org