You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/09/11 17:29:13 UTC

qpid-python git commit: QPID-7423: [Python Client 0-8..0-91] Chunk large content into many frames sized according to the frame size agreed by connection tune.

Repository: qpid-python
Updated Branches:
  refs/heads/master 586d41988 -> 5c237dc3c


QPID-7423: [Python Client 0-8..0-91] Chunk large content into many frames sized according to the frame size agreed by connection tune.

* Added supporting tests too

0-10 paths unchanged by this commit


Project: http://git-wip-us.apache.org/repos/asf/qpid-python/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-python/commit/5c237dc3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-python/tree/5c237dc3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-python/diff/5c237dc3

Branch: refs/heads/master
Commit: 5c237dc3c4d8ef9ed9f93b5b9ecebb64c41b79c7
Parents: 586d419
Author: Keith Wall <ke...@gmail.com>
Authored: Sun Sep 11 18:23:17 2016 +0100
Committer: Keith Wall <ke...@gmail.com>
Committed: Sun Sep 11 18:23:23 2016 +0100

----------------------------------------------------------------------
 qpid/client.py                |  9 ++--
 qpid/connection08.py          |  3 ++
 qpid/exceptions.py            |  1 +
 qpid/peer.py                  | 17 +++++--
 qpid_tests/broker_0_9/echo.py | 95 ++++++++++++++++++--------------------
 5 files changed, 68 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-python/blob/5c237dc3/qpid/client.py
----------------------------------------------------------------------
diff --git a/qpid/client.py b/qpid/client.py
index 5c68789..91b2721 100644
--- a/qpid/client.py
+++ b/qpid/client.py
@@ -184,12 +184,11 @@ class ClientDelegate(Delegate):
     msg.secure_ok(response=self.client.sasl.response(msg.challenge))
 
   def connection_tune(self, ch, msg):
+    tune_params = dict(zip(('channel_max', 'frame_max', 'heartbeat'), (msg.frame.args)))
     if self.client.tune_params:
-      #todo: just override the params, i.e. don't require them
-      #      all to be included in tune_params
-      msg.tune_ok(**self.client.tune_params)
-    else:
-      msg.tune_ok(*msg.frame.args)
+      tune_params.update(self.client.tune_params)
+    msg.tune_ok(**tune_params)
+    self.client.tune_params = tune_params
     self.client.started.set()
 
   def message_transfer(self, ch, msg):

http://git-wip-us.apache.org/repos/asf/qpid-python/blob/5c237dc3/qpid/connection08.py
----------------------------------------------------------------------
diff --git a/qpid/connection08.py b/qpid/connection08.py
index 5f9f71a..54d7f88 100644
--- a/qpid/connection08.py
+++ b/qpid/connection08.py
@@ -157,10 +157,13 @@ class FramingError(Exception):
 
 class Connection:
 
+  AMQP_HEADER_SIZE = 8
+
   def __init__(self, io, spec):
     self.codec = codec.Codec(io, spec)
     self.spec = spec
     self.FRAME_END = self.spec.constants.byname["frame_end"].id
+    self.FRAME_MIN_SIZE = spec.constants.byname['frame_min_size'].id
     self.write = getattr(self, "write_%s_%s" % (self.spec.major, self.spec.minor))
     self.read = getattr(self, "read_%s_%s" % (self.spec.major, self.spec.minor))
     self.io = io

http://git-wip-us.apache.org/repos/asf/qpid-python/blob/5c237dc3/qpid/exceptions.py
----------------------------------------------------------------------
diff --git a/qpid/exceptions.py b/qpid/exceptions.py
index 2bd80b7..b6ff68b 100644
--- a/qpid/exceptions.py
+++ b/qpid/exceptions.py
@@ -20,3 +20,4 @@
 class Closed(Exception): pass
 class Timeout(Exception): pass
 class VersionError(Exception): pass
+class ContentError(Exception): pass

http://git-wip-us.apache.org/repos/asf/qpid-python/blob/5c237dc3/qpid/peer.py
----------------------------------------------------------------------
diff --git a/qpid/peer.py b/qpid/peer.py
index fcad0f3..15f2abf 100644
--- a/qpid/peer.py
+++ b/qpid/peer.py
@@ -31,7 +31,7 @@ from queue import Queue, Closed as QueueClosed
 from content import Content
 from cStringIO import StringIO
 from time import time
-from exceptions import Closed, Timeout
+from exceptions import Closed, Timeout, ContentError
 from logging import getLogger
 
 log = getLogger("qpid.peer")
@@ -264,9 +264,20 @@ class Channel:
     self.write(header)
     for child in content.children:
       self.write_content(klass, child)
-    # should split up if content.body exceeds max frame size
     if content.body:
-      self.write(Body(content.body))
+      if not isinstance(content.body, (basestring, buffer)):
+        # The 0-8..0-91 client does not support the messages bodies apart from string/buffer - fail early
+        # if other type
+        raise ContentError("Content body must be string or buffer, not a %s" % type(content.body))
+      frame_max = self.client.tune_params['frame_max'] - self.client.conn.AMQP_HEADER_SIZE
+      start = 0
+      remaining_size = len(content.body)
+      while remaining_size > 0:
+        chunk_size = min(frame_max, remaining_size)
+        chunk = content.body[start:start + chunk_size]
+        self.write(Body(chunk))
+        start += chunk_size
+        remaining_size -= chunk_size
 
   def receive(self, frame, work):
     if isinstance(frame, Method):

http://git-wip-us.apache.org/repos/asf/qpid-python/blob/5c237dc3/qpid_tests/broker_0_9/echo.py
----------------------------------------------------------------------
diff --git a/qpid_tests/broker_0_9/echo.py b/qpid_tests/broker_0_9/echo.py
index a883568..b93a094 100644
--- a/qpid_tests/broker_0_9/echo.py
+++ b/qpid_tests/broker_0_9/echo.py
@@ -19,58 +19,55 @@
 
 from qpid.testlib import TestBase
 from qpid.content import Content
-import qpid.client
-
+from qpid.harness import Skipped
 
+import qpid.client
 
 class EchoTests(TestBase):
-    """Verify that messages can be sent and received retaining fidelity"""
-
-    def test_small_message(self):
-
-      channel = self.channel
-
-      self.queue_declare(queue="q")
-
-      channel.tx_select()
-      consumer = self.consume("q", no_ack=False)
-
-      body = self.uniqueString()
-      channel.basic_publish(
-          content=Content(body),
-          routing_key="q")
-      channel.tx_commit()
-
-      msg = consumer.get(timeout=1)
-      channel.basic_ack(delivery_tag=msg.delivery_tag)
-      channel.tx_commit()
-      self.assertEqual(body, msg.content.body)
-
-    def test_large_message(self):
-
-      channel = self.channel
-
-      self.queue_declare(queue="q")
-
-      channel.tx_select()
-      consumer = self.consume("q", no_ack=False)
-
-      # This is default maximum frame size supported by the Java Broker.  Python
-      # currently does not support framing of oversized messages in multiple frames.
-      body = self.randomLongString()
-      channel.basic_publish(
-        content=Content(body),
-        routing_key="q")
-      channel.tx_commit()
-
-      msg = consumer.get(timeout=1)
-      channel.basic_ack(delivery_tag=msg.delivery_tag)
-      channel.tx_commit()
-      self.assertEqual(len(body), len(msg.content.body))
-      self.assertEqual(body, msg.content.body)
-
+  """Verify that messages can be sent and received retaining fidelity"""
+
+  def setUp(self):
+    super(EchoTests, self).setUp()
+    self.frame_max_size = self.client.tune_params['frame_max']
+    self.assertTrue(self.frame_max_size >= self.client.conn.FRAME_MIN_SIZE)
+
+  def test_empty_message(self):
+    body = ''
+    self.echo_message(body)
+
+  def test_small_message(self):
+    body = self.uniqueString()
+    self.echo_message(body)
+
+  def test_largest_single_frame_message(self):
+    max_size_within_single_frame = self.frame_max_size - self.client.conn.AMQP_HEADER_SIZE
+    body = self.randomLongString(max_size_within_single_frame)
+    self.echo_message(body)
+
+  def test_multiple_frame_message(self):
+    size = self.frame_max_size * 2 - (self.client.conn.FRAME_MIN_SIZE / 2)
+    body = self.randomLongString(size)
+    self.echo_message(body)
+
+  def echo_message(self, body):
+    channel = self.channel
+    self.queue_declare(queue="q")
+    channel.tx_select()
+    consumer = self.consume("q", no_ack=False)
+    channel.basic_publish(
+            content=Content(body),
+            routing_key="q")
+    channel.tx_commit()
+    msg = consumer.get(timeout=1)
+    channel.basic_ack(delivery_tag=msg.delivery_tag)
+    channel.tx_commit()
+    self.assertEqual(len(body), len(msg.content.body))
+    self.assertEqual(body, msg.content.body)
 
     def test_large_message_received_in_many_content_frames(self):
+      if self.client.conn.FRAME_MIN_SIZE == self.frame_max_size:
+        raise Skipped("Test requires that frame_max_size (%d) exceeds frame_min_size (%d)" % (self.frame_max_size, self.frame_max_size))
+
       channel = self.channel
 
       queue_name = "q"
@@ -89,7 +86,7 @@ class EchoTests(TestBase):
         # Create a second connection with minimum framesize.  The Broker will then be forced to chunk
         # the content in order to send it to us.
         consuming_client = qpid.client.Client(self.config.broker.host, self.config.broker.port)
-        tune_params = { "channel_max" : 256, "frame_max" : 4096 }
+        tune_params = { "frame_max" : self.client.conn.FRAME_MIN_SIZE }
         consuming_client.start(username = self.config.broker.user, password = self.config.broker.password, tune_params = tune_params)
 
         consuming_channel = consuming_client.channel(1)
@@ -109,7 +106,7 @@ class EchoTests(TestBase):
           consuming_client.close()
 
     def test_commit_ok_possibly_interleaved_with_message_delivery(self):
-      """This test exposes an defect on the Java Broker (QPID-6094).  The Java Client
+      """This test exposes an defect on the Java Broker (QPID-6094).  The Java Broker (0.32 and below)
          can contravene the AMQP spec by sending other frames between the message header/frames.
          As this is a long standing defect in the Java Broker, QPID-6082 changed
          the Python client to allow it to tolerate such illegal interleaving.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org