You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/09/11 17:29:13 UTC
qpid-python git commit: QPID-7423: [Python Client 0-8..0-91] Chunk
large content into many frames sized according to the frame size agreed by
connection tune.
Repository: qpid-python
Updated Branches:
refs/heads/master 586d41988 -> 5c237dc3c
QPID-7423: [Python Client 0-8..0-91] Chunk large content into many frames sized according to the frame size agreed by connection tune.
* Added supporting tests too
0-10 paths unchanged by this commit
Project: http://git-wip-us.apache.org/repos/asf/qpid-python/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-python/commit/5c237dc3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-python/tree/5c237dc3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-python/diff/5c237dc3
Branch: refs/heads/master
Commit: 5c237dc3c4d8ef9ed9f93b5b9ecebb64c41b79c7
Parents: 586d419
Author: Keith Wall <ke...@gmail.com>
Authored: Sun Sep 11 18:23:17 2016 +0100
Committer: Keith Wall <ke...@gmail.com>
Committed: Sun Sep 11 18:23:23 2016 +0100
----------------------------------------------------------------------
qpid/client.py | 9 ++--
qpid/connection08.py | 3 ++
qpid/exceptions.py | 1 +
qpid/peer.py | 17 +++++--
qpid_tests/broker_0_9/echo.py | 95 ++++++++++++++++++--------------------
5 files changed, 68 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-python/blob/5c237dc3/qpid/client.py
----------------------------------------------------------------------
diff --git a/qpid/client.py b/qpid/client.py
index 5c68789..91b2721 100644
--- a/qpid/client.py
+++ b/qpid/client.py
@@ -184,12 +184,11 @@ class ClientDelegate(Delegate):
msg.secure_ok(response=self.client.sasl.response(msg.challenge))
def connection_tune(self, ch, msg):
+ tune_params = dict(zip(('channel_max', 'frame_max', 'heartbeat'), (msg.frame.args)))
if self.client.tune_params:
- #todo: just override the params, i.e. don't require them
- # all to be included in tune_params
- msg.tune_ok(**self.client.tune_params)
- else:
- msg.tune_ok(*msg.frame.args)
+ tune_params.update(self.client.tune_params)
+ msg.tune_ok(**tune_params)
+ self.client.tune_params = tune_params
self.client.started.set()
def message_transfer(self, ch, msg):
http://git-wip-us.apache.org/repos/asf/qpid-python/blob/5c237dc3/qpid/connection08.py
----------------------------------------------------------------------
diff --git a/qpid/connection08.py b/qpid/connection08.py
index 5f9f71a..54d7f88 100644
--- a/qpid/connection08.py
+++ b/qpid/connection08.py
@@ -157,10 +157,13 @@ class FramingError(Exception):
class Connection:
+ AMQP_HEADER_SIZE = 8
+
def __init__(self, io, spec):
self.codec = codec.Codec(io, spec)
self.spec = spec
self.FRAME_END = self.spec.constants.byname["frame_end"].id
+ self.FRAME_MIN_SIZE = spec.constants.byname['frame_min_size'].id
self.write = getattr(self, "write_%s_%s" % (self.spec.major, self.spec.minor))
self.read = getattr(self, "read_%s_%s" % (self.spec.major, self.spec.minor))
self.io = io
http://git-wip-us.apache.org/repos/asf/qpid-python/blob/5c237dc3/qpid/exceptions.py
----------------------------------------------------------------------
diff --git a/qpid/exceptions.py b/qpid/exceptions.py
index 2bd80b7..b6ff68b 100644
--- a/qpid/exceptions.py
+++ b/qpid/exceptions.py
@@ -20,3 +20,4 @@
class Closed(Exception): pass
class Timeout(Exception): pass
class VersionError(Exception): pass
+class ContentError(Exception): pass
http://git-wip-us.apache.org/repos/asf/qpid-python/blob/5c237dc3/qpid/peer.py
----------------------------------------------------------------------
diff --git a/qpid/peer.py b/qpid/peer.py
index fcad0f3..15f2abf 100644
--- a/qpid/peer.py
+++ b/qpid/peer.py
@@ -31,7 +31,7 @@ from queue import Queue, Closed as QueueClosed
from content import Content
from cStringIO import StringIO
from time import time
-from exceptions import Closed, Timeout
+from exceptions import Closed, Timeout, ContentError
from logging import getLogger
log = getLogger("qpid.peer")
@@ -264,9 +264,20 @@ class Channel:
self.write(header)
for child in content.children:
self.write_content(klass, child)
- # should split up if content.body exceeds max frame size
if content.body:
- self.write(Body(content.body))
+ if not isinstance(content.body, (basestring, buffer)):
+ # The 0-8..0-91 client does not support the messages bodies apart from string/buffer - fail early
+ # if other type
+ raise ContentError("Content body must be string or buffer, not a %s" % type(content.body))
+ frame_max = self.client.tune_params['frame_max'] - self.client.conn.AMQP_HEADER_SIZE
+ start = 0
+ remaining_size = len(content.body)
+ while remaining_size > 0:
+ chunk_size = min(frame_max, remaining_size)
+ chunk = content.body[start:start + chunk_size]
+ self.write(Body(chunk))
+ start += chunk_size
+ remaining_size -= chunk_size
def receive(self, frame, work):
if isinstance(frame, Method):
http://git-wip-us.apache.org/repos/asf/qpid-python/blob/5c237dc3/qpid_tests/broker_0_9/echo.py
----------------------------------------------------------------------
diff --git a/qpid_tests/broker_0_9/echo.py b/qpid_tests/broker_0_9/echo.py
index a883568..b93a094 100644
--- a/qpid_tests/broker_0_9/echo.py
+++ b/qpid_tests/broker_0_9/echo.py
@@ -19,58 +19,55 @@
from qpid.testlib import TestBase
from qpid.content import Content
-import qpid.client
-
+from qpid.harness import Skipped
+import qpid.client
class EchoTests(TestBase):
- """Verify that messages can be sent and received retaining fidelity"""
-
- def test_small_message(self):
-
- channel = self.channel
-
- self.queue_declare(queue="q")
-
- channel.tx_select()
- consumer = self.consume("q", no_ack=False)
-
- body = self.uniqueString()
- channel.basic_publish(
- content=Content(body),
- routing_key="q")
- channel.tx_commit()
-
- msg = consumer.get(timeout=1)
- channel.basic_ack(delivery_tag=msg.delivery_tag)
- channel.tx_commit()
- self.assertEqual(body, msg.content.body)
-
- def test_large_message(self):
-
- channel = self.channel
-
- self.queue_declare(queue="q")
-
- channel.tx_select()
- consumer = self.consume("q", no_ack=False)
-
- # This is default maximum frame size supported by the Java Broker. Python
- # currently does not support framing of oversized messages in multiple frames.
- body = self.randomLongString()
- channel.basic_publish(
- content=Content(body),
- routing_key="q")
- channel.tx_commit()
-
- msg = consumer.get(timeout=1)
- channel.basic_ack(delivery_tag=msg.delivery_tag)
- channel.tx_commit()
- self.assertEqual(len(body), len(msg.content.body))
- self.assertEqual(body, msg.content.body)
-
+ """Verify that messages can be sent and received retaining fidelity"""
+
+ def setUp(self):
+ super(EchoTests, self).setUp()
+ self.frame_max_size = self.client.tune_params['frame_max']
+ self.assertTrue(self.frame_max_size >= self.client.conn.FRAME_MIN_SIZE)
+
+ def test_empty_message(self):
+ body = ''
+ self.echo_message(body)
+
+ def test_small_message(self):
+ body = self.uniqueString()
+ self.echo_message(body)
+
+ def test_largest_single_frame_message(self):
+ max_size_within_single_frame = self.frame_max_size - self.client.conn.AMQP_HEADER_SIZE
+ body = self.randomLongString(max_size_within_single_frame)
+ self.echo_message(body)
+
+ def test_multiple_frame_message(self):
+ size = self.frame_max_size * 2 - (self.client.conn.FRAME_MIN_SIZE / 2)
+ body = self.randomLongString(size)
+ self.echo_message(body)
+
+ def echo_message(self, body):
+ channel = self.channel
+ self.queue_declare(queue="q")
+ channel.tx_select()
+ consumer = self.consume("q", no_ack=False)
+ channel.basic_publish(
+ content=Content(body),
+ routing_key="q")
+ channel.tx_commit()
+ msg = consumer.get(timeout=1)
+ channel.basic_ack(delivery_tag=msg.delivery_tag)
+ channel.tx_commit()
+ self.assertEqual(len(body), len(msg.content.body))
+ self.assertEqual(body, msg.content.body)
def test_large_message_received_in_many_content_frames(self):
+ if self.client.conn.FRAME_MIN_SIZE == self.frame_max_size:
+ raise Skipped("Test requires that frame_max_size (%d) exceeds frame_min_size (%d)" % (self.frame_max_size, self.frame_max_size))
+
channel = self.channel
queue_name = "q"
@@ -89,7 +86,7 @@ class EchoTests(TestBase):
# Create a second connection with minimum framesize. The Broker will then be forced to chunk
# the content in order to send it to us.
consuming_client = qpid.client.Client(self.config.broker.host, self.config.broker.port)
- tune_params = { "channel_max" : 256, "frame_max" : 4096 }
+ tune_params = { "frame_max" : self.client.conn.FRAME_MIN_SIZE }
consuming_client.start(username = self.config.broker.user, password = self.config.broker.password, tune_params = tune_params)
consuming_channel = consuming_client.channel(1)
@@ -109,7 +106,7 @@ class EchoTests(TestBase):
consuming_client.close()
def test_commit_ok_possibly_interleaved_with_message_delivery(self):
- """This test exposes an defect on the Java Broker (QPID-6094). The Java Client
+ """This test exposes an defect on the Java Broker (QPID-6094). The Java Broker (0.32 and below)
can contravene the AMQP spec by sending other frames between the message header/frames.
As this is a long standing defect in the Java Broker, QPID-6082 changed
the Python client to allow it to tolerate such illegal interleaving.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org