You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/09/12 15:25:49 UTC
svn commit: r1624545 - in /qpid/trunk/qpid:
java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/
python/qpid/ tests/src/py/qpid_tests/broker_0_9/
Author: kwall
Date: Fri Sep 12 13:25:49 2014
New Revision: 1624545
URL: http://svn.apache.org/r1624545
Log:
QPID-6081, QPID-6082: [Python Client Tests] Add python tests for verifying the receipt of large messages occupying more than one frame (08-091)
Added:
qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/echo.py
Modified:
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
qpid/trunk/qpid/python/qpid/testlib.py
qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py
Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java?rev=1624545&r1=1624544&r2=1624545&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java Fri Sep 12 13:25:49 2014
@@ -225,9 +225,9 @@ class ProtocolOutputConverterImpl implem
private class MessageContentSourceBody implements AMQBody
{
public static final byte TYPE = 3;
- private int _length;
- private MessageContentSource _message;
- private int _offset;
+ private final int _length;
+ private final MessageContentSource _message;
+ private final int _offset;
public MessageContentSourceBody(MessageContentSource message, int offset, int length)
{
@@ -269,6 +269,13 @@ class ProtocolOutputConverterImpl implem
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public String toString()
+ {
+ return "[" + getClass().getSimpleName() + " offset: " + _offset + ", length: " + _length + "]";
+ }
+
}
public long writeGetOk(final ServerMessage msg,
Modified: qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/testlib.py?rev=1624545&r1=1624544&r2=1624545&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ qpid/trunk/qpid/python/qpid/testlib.py Fri Sep 12 13:25:49 2014
@@ -21,6 +21,9 @@
# Support library for qpid python tests.
#
+import string
+import random
+
import unittest, traceback, socket
import qpid.client, qmf.console
import Queue
@@ -77,8 +80,8 @@ class TestBase(unittest.TestCase):
"""Create a new connction, return the Client object"""
host = host or self.config.broker.host
port = port or self.config.broker.port or 5672
- user = user or "guest"
- password = password or "guest"
+ user = user or self.config.broker.user or "guest"
+ password = password or self.config.broker.password or "guest"
client = qpid.client.Client(host, port)
try:
if client.spec.major == 8 and client.spec.minor == 0:
@@ -114,9 +117,14 @@ class TestBase(unittest.TestCase):
if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
return "Test Message " + str(self.uniqueCounter)
- def consume(self, queueName):
+ def randomLongString(self, length=65535):
+ body = ''.join(random.choice(string.ascii_uppercase) for _ in range(length))
+ return body
+
+ def consume(self, queueName, no_ack=True):
"""Consume from named queue returns the Queue object."""
- reply = self.channel.basic_consume(queue=queueName, no_ack=True)
+
+ reply = self.channel.basic_consume(queue=queueName, no_ack=no_ack)
return self.client.queue(reply.consumer_tag)
def subscribe(self, channel=None, **keys):
Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py?rev=1624545&r1=1624544&r2=1624545&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py Fri Sep 12 13:25:49 2014
@@ -19,4 +19,4 @@
# under the License.
#
-import query, queue, messageheader
+import query, queue, messageheader, echo
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/echo.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/echo.py?rev=1624545&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/echo.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/echo.py Fri Sep 12 13:25:49 2014
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.testlib import TestBase
+from qpid.content import Content
+import qpid.client
+
+
+
+class EchoTests(TestBase):
+ """Verify that messages can be sent and received retaining fidelity"""
+
+ def test_small_message(self):
+
+ channel = self.channel
+
+ self.queue_declare(queue="q")
+
+ channel.tx_select()
+ consumer = self.consume("q", no_ack=False)
+
+ body = self.uniqueString()
+ channel.basic_publish(
+ content=Content(body),
+ routing_key="q")
+ channel.tx_commit()
+
+ msg = consumer.get(timeout=1)
+ channel.basic_ack(delivery_tag=msg.delivery_tag)
+ channel.tx_commit()
+ self.assertEqual(body, msg.content.body)
+
+ def test_large_message(self):
+
+ channel = self.channel
+
+ self.queue_declare(queue="q")
+
+ channel.tx_select()
+ consumer = self.consume("q", no_ack=False)
+
+ # This is default maximum frame size supported by the Java Broker. Python
+ # currently does not support framing of oversized messages in multiple frames.
+ body = self.randomLongString()
+ channel.basic_publish(
+ content=Content(body),
+ routing_key="q")
+ channel.tx_commit()
+
+ msg = consumer.get(timeout=1)
+ channel.basic_ack(delivery_tag=msg.delivery_tag)
+ channel.tx_commit()
+ self.assertEqual(len(body), len(msg.content.body))
+ self.assertEqual(body, msg.content.body)
+
+
+ def test_large_message_received_in_many_content_frames(self):
+ channel = self.channel
+
+ queue_name = "q"
+ self.queue_declare(queue=queue_name)
+
+ channel.tx_select()
+
+ body = self.randomLongString()
+ channel.basic_publish(
+ content=Content(body),
+ routing_key=queue_name)
+ channel.tx_commit()
+
+ consuming_client = None
+ try:
+ # Create a second connection with minimum framesize. The Broker will then be forced to chunk
+ # the content in order to send it to us.
+ consuming_client = qpid.client.Client(self.config.broker.host, self.config.broker.port)
+ tune_params = { "channel_max" : 256, "frame_max" : 4096 }
+ consuming_client.start("\x00" + self.config.broker.user + "\x00" + self.config.broker.password, mechanism="PLAIN", tune_params = tune_params)
+
+ consuming_channel = consuming_client.channel(1)
+ consuming_channel.channel_open()
+ consuming_channel.tx_select()
+
+ consumer_reply = consuming_channel.basic_consume(queue=queue_name, no_ack=False)
+ consumer = consuming_client.queue(consumer_reply.consumer_tag)
+ msg = consumer.get(timeout=1)
+ consuming_channel.basic_ack(delivery_tag=msg.delivery_tag)
+ consuming_channel.tx_commit()
+
+ self.assertEqual(len(body), len(msg.content.body))
+ self.assertEqual(body, msg.content.body)
+ finally:
+ if consuming_client:
+ consuming_client.close()
+
+ def test_commit_ok_possibly_interleaved_with_message_delivery(self):
+ """This test exposes an defect on the Java Broker (QPID-6094). The Java Client
+ can contravene the AMQP spec by sending other frames between the message header/frames.
+ As this is a long standing defect in the Java Broker, QPID-6082 changed
+ the Python client to allow it to tolerate such illegal interleaving.
+ """
+ channel = self.channel
+
+ queue_name = "q"
+ self.queue_declare(queue=queue_name)
+
+ count = 25
+ channel.basic_qos(prefetch_count=count)
+
+ channel.tx_select()
+
+ bodies = []
+ for i in range(count):
+ body = self.randomLongString()
+ bodies.append(body)
+ channel.basic_publish(
+ content=Content(bodies[i]),
+ routing_key=queue_name)
+ channel.tx_commit()
+
+ # Start consuming. Prefetch will mean the Broker will start to send us
+ # all the messages accumulating them in the client.
+ consumer = self.consume("q", no_ack=False)
+
+ # Get and ack/commit the first message
+ msg = consumer.get(timeout=1)
+ channel.basic_ack(delivery_tag=msg.delivery_tag)
+ channel.tx_commit()
+ # In the problematic case, the Broker interleaves our commit-ok response amongst the content
+ # frames of message. QPID-6082 means the Python client now tolerates this
+ # problem and all messages should arrive correctly.
+
+ expectedBody = bodies[0]
+ self.assertEqual(len(expectedBody), len(msg.content.body))
+ self.assertEqual(expectedBody, msg.content.body)
+
+ for i in range(1, len(bodies)):
+ msg = consumer.get(timeout=5)
+
+ expectedBody = bodies[i]
+ self.assertEqual(len(expectedBody), len(msg.content.body))
+ self.assertEqual(expectedBody, msg.content.body)
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org