You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2015/10/14 17:56:10 UTC

svn commit: r1708636 - in /qpid/java/trunk: broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ common/src/main/java/org/apache/qpid/transport/codec/ systests/src/test/java/org/apache/qpid/test/unit/basic/ test-profiles/

Author: lquack
Date: Wed Oct 14 15:56:10 2015
New Revision: 1708636

URL: http://svn.apache.org/viewvc?rev=1708636&view=rev
Log:
QPID-6786: [Java Broker] Fix handling of large headers in 0-10

The ServerDecoder was being reused but the internal _bufferIndex was not being reset. Fix by not reusing the decoder in the first place.

Modified:
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
    qpid/java/trunk/test-profiles/CPPExcludes
    qpid/java/trunk/test-profiles/Java010Excludes
    qpid/java/trunk/test-profiles/JavaPre010Excludes

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java?rev=1708636&r1=1708635&r2=1708636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java Wed Oct 14 15:56:10 2015
@@ -248,8 +248,7 @@ public class MessageMetaData_0_10 implem
     {
         public MessageMetaData_0_10 createMetaData(QpidByteBuffer buf)
         {
-            ServerDecoder decoder = new ServerDecoder();
-            decoder.init(Collections.singletonList(buf));
+            ServerDecoder decoder = new ServerDecoder(Collections.singletonList(buf));
 
             long arrivalTime = decoder.readInt64();
             int bodySize = decoder.readInt32();

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java?rev=1708636&r1=1708635&r2=1708636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java Wed Oct 14 15:56:10 2015
@@ -65,7 +65,6 @@ public class ServerAssembler
     private final Map<Integer, Method> _incompleteMethodMap = new HashMap<>();
 
     private final Map<Integer,List<ServerFrame>> _segments;
-    private final ServerDecoder _decoder = new ServerDecoder();
 
     public ServerAssembler(ServerConnection connection)
     {
@@ -183,8 +182,7 @@ public class ServerAssembler
 
     private void assemble(ServerFrame frame, List<QpidByteBuffer> frameBuffers)
     {
-        ServerDecoder dec = _decoder;
-        dec.init(frameBuffers);
+        ServerDecoder dec = new ServerDecoder(frameBuffers);
 
         int channel = frame.getChannel();
         Method command;
@@ -259,7 +257,6 @@ public class ServerAssembler
                 throw new IllegalStateException("unknown frame type: " + frame.getType());
         }
 
-        dec.releaseBuffer();
         for(QpidByteBuffer buf : frameBuffers)
         {
             buf.dispose();

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java?rev=1708636&r1=1708635&r2=1708636&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDecoder.java Wed Oct 14 15:56:10 2015
@@ -25,15 +25,15 @@ import java.util.List;
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.transport.codec.AbstractDecoder;
 
-public final class ServerDecoder extends AbstractDecoder
+final class ServerDecoder extends AbstractDecoder
 {
-    private List<QpidByteBuffer> _underlying;
+    private final List<QpidByteBuffer> _underlying;
     private int _bufferIndex;
 
-
-    public void init(List<QpidByteBuffer> in)
+    ServerDecoder(List<QpidByteBuffer> in)
     {
         _underlying = in;
+        _bufferIndex = 0;
     }
 
     private void advanceIfNecessary()
@@ -111,12 +111,6 @@ public final class ServerDecoder extends
         return _underlying.get(_bufferIndex);
     }
 
-
-    public void releaseBuffer()
-    {
-        _underlying = null;
-    }
-
     protected byte doGet()
     {
         return getBuffer(1).get();
@@ -191,7 +185,7 @@ public final class ServerDecoder extends
 		return getBuffer(1).get();
 	}
 
-	public byte[] readReaminingBytes()
+	public byte[] readRemainingBytes()
 	{
       byte[] result = new byte[available()];
       get(result);

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java?rev=1708636&r1=1708635&r2=1708636&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/BBDecoder.java Wed Oct 14 15:56:10 2015
@@ -135,7 +135,7 @@ public final class BBDecoder extends Abs
 		return in.get();
 	}
 
-	public byte[] readReaminingBytes()
+	public byte[] readRemainingBytes()
 	{
       byte[] result = new byte[in.limit() - in.position()];
       get(result);

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java?rev=1708636&r1=1708635&r2=1708636&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/codec/Decoder.java Wed Oct 14 15:56:10 2015
@@ -271,7 +271,7 @@ public interface Decoder
      * 
      * @return the remaining bytes on the underlying buffer.
      */
-    byte[] readReaminingBytes ();
+    byte[] readRemainingBytes();
     
     /**
      * Reads the given number of bytes.

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?rev=1708636&r1=1708635&r2=1708636&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java Wed Oct 14 15:56:10 2015
@@ -33,12 +33,14 @@ import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.MessageFormatException;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 
+import com.google.common.base.Strings;
 import org.junit.Assert;
 
 import org.apache.qpid.client.AMQConnection;
@@ -55,13 +57,11 @@ public class PropertyValueTest extends Q
 
     private AMQConnection _connection;
     private Destination _destination;
-    private AMQSession _session;
+    private Session _session;
     private final List<JMSTextMessage> received = new ArrayList<JMSTextMessage>();
     private final List<String> messages = new ArrayList<String>();
     private Map<String, Destination> _replyToDestinations;
     private int _count = 1;
-    public String _connectionString = "vm://:1";
-    private static final String USERNAME = "guest";
 
     protected void setUp() throws Exception
     {
@@ -164,6 +164,102 @@ public class PropertyValueTest extends Q
         runBatch(50);
     }
 
+    /*
+    QPID-6786
+    */
+    public void testLargeHeader_010_HeadersFillContentHeaderFrame() throws Exception
+    {
+        _connection = (AMQConnection) getConnection();
+        int maximumFrameSize = (int) _connection.getMaximumFrameSize();
+        String propertyName = "string";
+        String propertyValue = generateLongString((int) maximumFrameSize *2);
+        sendReceiveMessageWithHeader(_connection, propertyName, propertyValue);
+    }
+
+    public void testLargeHeader_08091_HeadersFillContentHeaderFrame() throws Exception
+    {
+        _connection = (AMQConnection) getConnection();
+        int maximumFrameSize = (int) _connection.getMaximumFrameSize();
+        String propertyName = "string";
+        int overhead = calculateOverHead_08091_FrameWithHeader(propertyName);
+
+        String propertyValue = generateLongString(maximumFrameSize - overhead);
+        sendReceiveMessageWithHeader(_connection, propertyName, propertyValue);
+    }
+
+    public void testOverlyLargeHeaderRejected_08091() throws Exception
+    {
+        _connection = (AMQConnection) getConnection();
+        _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+        MessageProducer producer = _session.createProducer(_session.createQueue(getTestQueueName()));
+
+        int maximumFrameSize = (int) _connection.getMaximumFrameSize();
+        String propertyName = "string";
+        int overhead = calculateOverHead_08091_FrameWithHeader(propertyName);
+
+        String propertyValue = generateLongString(maximumFrameSize - overhead + 1);
+        try
+        {
+            Message m = _session.createMessage();
+            m.setStringProperty(propertyName, propertyValue);
+            producer.send(m);
+            fail("Exception not thrown");
+        }
+        catch (JMSException je)
+        {
+            assertTrue("Unexpected message " + je.getMessage(), je.getMessage().contains("Unable to send message as the headers are too large"));
+            // PASS
+        }
+    }
+
+
+    private int calculateOverHead_08091_FrameWithHeader(final String propertyName)
+    {
+        int frame = 1 + 2 + 4 + 1;
+        int body = 2 + 2 + 8 + 2;
+        int properties = GUEST_USERNAME.length() + 1  // Username + length
+                         + 1 // DeliveryMode byte
+                         + 1 // Priority byte
+                         + "application/octet-stream".length() + 1 // Encoding + length
+                         + 4  // Headers field table
+                         + "JMS_QPID_DESTTYPE".length() + 1 + 1 + 4
+                         + propertyName.length() + 1 + 1 + 4;
+        return frame + body + properties;
+    }
+
+    private String generateLongString(final int count)
+    {
+        String pattern = "abcde";
+        String str = Strings.repeat(pattern, count / pattern.length()) + pattern.substring(0, count % pattern.length());
+        assertEquals(count, str.length());
+        return str;
+    }
+
+    private void sendReceiveMessageWithHeader(Connection connection,
+                                              final String propName, final String propValue) throws Exception
+    {
+        connection.start();
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+
+        Destination destination = session.createQueue(getTestQueueName());
+
+        Message message = session.createMessage();
+        message.setStringProperty(propName, propValue);
+
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDisableMessageID(true);
+        producer.setDisableMessageTimestamp(true);
+        producer.send(message);
+        session.commit();
+
+        Message receivedMessage = consumer.receive(1000);
+        assertNotNull("Message not received", receivedMessage);
+        assertEquals("Message has unexpected property value", propValue, receivedMessage.getStringProperty(propName));
+        session.commit();
+    }
+
     private void runBatch(int runSize)
     {
         try
@@ -174,12 +270,12 @@ public class PropertyValueTest extends Q
                 _logger.error("Run Number:" + run++);
                 try
                 {
-                    init( (AMQConnection) getConnection("guest", "guest"));
+                    init( (AMQConnection) getConnection());
                 }
                 catch (Exception e)
                 {
                     _logger.error("exception:", e);
-                    fail("Unable to initialilse connection: " + e);
+                    fail("Unable to initialise connection: " + e);
                 }
 
                 int count = _count;
@@ -188,7 +284,6 @@ public class PropertyValueTest extends Q
                 check();
                 _logger.info("Completed without failure");
 
-                Thread.sleep(10);
                 _connection.close();
 
                 _logger.error("End Run Number:" + (run - 1));
@@ -303,7 +398,7 @@ public class PropertyValueTest extends Q
             //JMSXUserID
             if (m.getStringProperty("JMSXUserID") != null)
             {
-                Assert.assertEquals("Check 'JMSXUserID' is supported ", USERNAME,
+                Assert.assertEquals("Check 'JMSXUserID' is supported ", QpidBrokerTestCase.GUEST_USERNAME,
                                     m.getStringProperty("JMSXUserID"));
             }
         }
@@ -366,21 +461,4 @@ public class PropertyValueTest extends Q
         return in + System.currentTimeMillis();
     }
 
-    public static void main(String[] argv) throws Exception
-    {
-        PropertyValueTest test = new PropertyValueTest();
-        test._connectionString = (argv.length == 0) ? "vm://:1" : argv[0];
-        test.setUp();
-        if (argv.length > 1)
-        {
-            test._count = Integer.parseInt(argv[1]);
-        }
-
-        test.testOnce();
-    }
-
-    public static junit.framework.Test suite()
-    {
-        return new junit.framework.TestSuite(PropertyValueTest.class);
-    }
 }

Modified: qpid/java/trunk/test-profiles/CPPExcludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/CPPExcludes?rev=1708636&r1=1708635&r2=1708636&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/CPPExcludes (original)
+++ qpid/java/trunk/test-profiles/CPPExcludes Wed Oct 14 15:56:10 2015
@@ -224,3 +224,7 @@ org.apache.qpid.server.protocol.v0_8.*
 //Java Broker BDB System Tests
 org.apache.qpid.server.store.berkeleydb.*
 org.apache.qpid.server.store.berkeleydb.replication.*
+
+// These tests are 0-8..0-91 specific
+org.apache.qpid.test.unit.basic.PropertyValueTest#testLargeHeader_08091_HeadersFillContentHeaderFrame
+org.apache.qpid.test.unit.basic.PropertyValueTest#testOverlyLargeHeaderRejected_08091

Modified: qpid/java/trunk/test-profiles/Java010Excludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/Java010Excludes?rev=1708636&r1=1708635&r2=1708636&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/Java010Excludes (original)
+++ qpid/java/trunk/test-profiles/Java010Excludes Wed Oct 14 15:56:10 2015
@@ -83,3 +83,6 @@ org.apache.qpid.client.failover.AddressB
 
 // This test does not make sense because on 0-10 maxFrameSize is 64kB and the Java Broker sets the networkBufferSize to a minimum of 64kB.
 org.apache.qpid.transport.MaxFrameSizeTest#testTooLargeFrameSize
+
+org.apache.qpid.test.unit.basic.PropertyValueTest#testLargeHeader_08091_HeadersFillContentHeaderFrame
+org.apache.qpid.test.unit.basic.PropertyValueTest#testOverlyLargeHeaderRejected_08091

Modified: qpid/java/trunk/test-profiles/JavaPre010Excludes
URL: http://svn.apache.org/viewvc/qpid/java/trunk/test-profiles/JavaPre010Excludes?rev=1708636&r1=1708635&r2=1708636&view=diff
==============================================================================
--- qpid/java/trunk/test-profiles/JavaPre010Excludes (original)
+++ qpid/java/trunk/test-profiles/JavaPre010Excludes Wed Oct 14 15:56:10 2015
@@ -88,3 +88,5 @@ org.apache.qpid.systest.management.jmx.Q
 // QPID-3396
 org.apache.qpid.test.unit.client.connection.ConnectionTest#testExceptionWhenUserPassIsRequired
 
+// Testing of large 0-10 headers (QPID-6786)
+org.apache.qpid.test.unit.basic.PropertyValueTest#testLargeHeader_010_HeadersFillContentHeaderFrame



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org