You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/09/12 15:25:49 UTC

svn commit: r1624545 - in /qpid/trunk/qpid: java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ python/qpid/ tests/src/py/qpid_tests/broker_0_9/

Author: kwall
Date: Fri Sep 12 13:25:49 2014
New Revision: 1624545

URL: http://svn.apache.org/r1624545
Log:
QPID-6081, QPID-6082: [Python Client Tests] Add python tests for verifying the receipt of large messages occupying more than one frame (08-091)

Added:
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/echo.py
Modified:
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
    qpid/trunk/qpid/python/qpid/testlib.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py

Modified: qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java?rev=1624545&r1=1624544&r2=1624545&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java Fri Sep 12 13:25:49 2014
@@ -225,9 +225,9 @@ class ProtocolOutputConverterImpl implem
     private class MessageContentSourceBody implements AMQBody
     {
         public static final byte TYPE = 3;
-        private int _length;
-        private MessageContentSource _message;
-        private int _offset;
+        private final int _length;
+        private final MessageContentSource _message;
+        private final int _offset;
 
         public MessageContentSourceBody(MessageContentSource message, int offset, int length)
         {
@@ -269,6 +269,13 @@ class ProtocolOutputConverterImpl implem
         {
             throw new UnsupportedOperationException();
         }
+
+        @Override
+        public String toString()
+        {
+            return "[" + getClass().getSimpleName() + " offset: " + _offset + ", length: " + _length + "]";
+        }
+
     }
 
     public long writeGetOk(final ServerMessage msg,

Modified: qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/testlib.py?rev=1624545&r1=1624544&r2=1624545&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ qpid/trunk/qpid/python/qpid/testlib.py Fri Sep 12 13:25:49 2014
@@ -21,6 +21,9 @@
 # Support library for qpid python tests.
 #
 
+import string
+import random
+
 import unittest, traceback, socket
 import qpid.client, qmf.console
 import Queue
@@ -77,8 +80,8 @@ class TestBase(unittest.TestCase):
         """Create a new connction, return the Client object"""
         host = host or self.config.broker.host
         port = port or self.config.broker.port or 5672
-        user = user or "guest"
-        password = password or "guest"
+        user = user or self.config.broker.user or "guest"
+        password = password or self.config.broker.password or "guest"
         client = qpid.client.Client(host, port)
         try:
             if client.spec.major == 8 and client.spec.minor == 0:
@@ -114,9 +117,14 @@ class TestBase(unittest.TestCase):
         if not "uniqueCounter" in dir(self): self.uniqueCounter = 1;
         return "Test Message " + str(self.uniqueCounter)
 
-    def consume(self, queueName):
+    def randomLongString(self, length=65535):
+      body = ''.join(random.choice(string.ascii_uppercase) for _ in range(length))
+      return body
+
+    def consume(self, queueName, no_ack=True):
         """Consume from named queue returns the Queue object."""
-        reply = self.channel.basic_consume(queue=queueName, no_ack=True)
+
+        reply = self.channel.basic_consume(queue=queueName, no_ack=no_ack)
         return self.client.queue(reply.consumer_tag)
 
     def subscribe(self, channel=None, **keys):

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py?rev=1624545&r1=1624544&r2=1624545&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py Fri Sep 12 13:25:49 2014
@@ -19,4 +19,4 @@
 # under the License.
 #
 
-import query, queue, messageheader
+import query, queue, messageheader, echo

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/echo.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/echo.py?rev=1624545&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/echo.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/echo.py Fri Sep 12 13:25:49 2014
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.testlib import TestBase
+from qpid.content import Content
+import qpid.client
+
+
+
+class EchoTests(TestBase):
+    """Verify that messages can be sent and received retaining fidelity"""
+
+    def test_small_message(self):
+
+      channel = self.channel
+
+      self.queue_declare(queue="q")
+
+      channel.tx_select()
+      consumer = self.consume("q", no_ack=False)
+
+      body = self.uniqueString()
+      channel.basic_publish(
+          content=Content(body),
+          routing_key="q")
+      channel.tx_commit()
+
+      msg = consumer.get(timeout=1)
+      channel.basic_ack(delivery_tag=msg.delivery_tag)
+      channel.tx_commit()
+      self.assertEqual(body, msg.content.body)
+
+    def test_large_message(self):
+
+      channel = self.channel
+
+      self.queue_declare(queue="q")
+
+      channel.tx_select()
+      consumer = self.consume("q", no_ack=False)
+
+      # This is default maximum frame size supported by the Java Broker.  Python
+      # currently does not support framing of oversized messages in multiple frames.
+      body = self.randomLongString()
+      channel.basic_publish(
+        content=Content(body),
+        routing_key="q")
+      channel.tx_commit()
+
+      msg = consumer.get(timeout=1)
+      channel.basic_ack(delivery_tag=msg.delivery_tag)
+      channel.tx_commit()
+      self.assertEqual(len(body), len(msg.content.body))
+      self.assertEqual(body, msg.content.body)
+
+
+    def test_large_message_received_in_many_content_frames(self):
+      channel = self.channel
+
+      queue_name = "q"
+      self.queue_declare(queue=queue_name)
+
+      channel.tx_select()
+
+      body = self.randomLongString()
+      channel.basic_publish(
+        content=Content(body),
+        routing_key=queue_name)
+      channel.tx_commit()
+
+      consuming_client = None
+      try:
+        # Create a second connection with minimum framesize.  The Broker will then be forced to chunk
+        # the content in order to send it to us.
+        consuming_client = qpid.client.Client(self.config.broker.host, self.config.broker.port)
+        tune_params = { "channel_max" : 256, "frame_max" : 4096 }
+        consuming_client.start("\x00" + self.config.broker.user + "\x00" + self.config.broker.password, mechanism="PLAIN", tune_params = tune_params)
+
+        consuming_channel = consuming_client.channel(1)
+        consuming_channel.channel_open()
+        consuming_channel.tx_select()
+
+        consumer_reply = consuming_channel.basic_consume(queue=queue_name, no_ack=False)
+        consumer = consuming_client.queue(consumer_reply.consumer_tag)
+        msg = consumer.get(timeout=1)
+        consuming_channel.basic_ack(delivery_tag=msg.delivery_tag)
+        consuming_channel.tx_commit()
+
+        self.assertEqual(len(body), len(msg.content.body))
+        self.assertEqual(body, msg.content.body)
+      finally:
+        if consuming_client:
+          consuming_client.close()
+
+    def test_commit_ok_possibly_interleaved_with_message_delivery(self):
+      """This test exposes an defect on the Java Broker (QPID-6094).  The Java Client
+         can contravene the AMQP spec by sending other frames between the message header/frames.
+         As this is a long standing defect in the Java Broker, QPID-6082 changed
+         the Python client to allow it to tolerate such illegal interleaving.
+         """
+      channel = self.channel
+
+      queue_name = "q"
+      self.queue_declare(queue=queue_name)
+
+      count = 25
+      channel.basic_qos(prefetch_count=count)
+
+      channel.tx_select()
+
+      bodies = []
+      for i in range(count):
+        body = self.randomLongString()
+        bodies.append(body)
+        channel.basic_publish(
+          content=Content(bodies[i]),
+          routing_key=queue_name)
+        channel.tx_commit()
+
+      # Start consuming.  Prefetch will mean the Broker will start to send us
+      # all the messages accumulating them in the client.
+      consumer = self.consume("q", no_ack=False)
+
+      # Get and ack/commit the first message
+      msg = consumer.get(timeout=1)
+      channel.basic_ack(delivery_tag=msg.delivery_tag)
+      channel.tx_commit()
+      # In the problematic case, the Broker interleaves our commit-ok response amongst the content
+      # frames of message.   QPID-6082 means the Python client now tolerates this
+      # problem and all messages should arrive correctly.
+
+      expectedBody = bodies[0]
+      self.assertEqual(len(expectedBody), len(msg.content.body))
+      self.assertEqual(expectedBody, msg.content.body)
+
+      for i in range(1, len(bodies)):
+        msg = consumer.get(timeout=5)
+
+        expectedBody = bodies[i]
+        self.assertEqual(len(expectedBody), len(msg.content.body))
+        self.assertEqual(expectedBody, msg.content.body)
+
+



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org