You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2013/11/25 16:26:01 UTC

svn commit: r1545320 - in /qpid/jms/trunk/src: main/java/org/apache/qpid/jms/engine/ main/java/org/apache/qpid/jms/impl/ test/java/org/apache/qpid/jms/ test/java/org/apache/qpid/jms/impl/ test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/

Author: robbie
Date: Mon Nov 25 15:26:01 2013
New Revision: 1545320

URL: http://svn.apache.org/r1545320
Log:
QPIDJMS-9: add support for the durable header field in messages, ensure the producer sets/overrides this as necessary

Added:
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageProducerIntegrationTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java
Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java Mon Nov 25 15:26:01 2013
@@ -49,11 +49,13 @@ public abstract class AmqpMessage
     private volatile Map<String,Object> _applicationPropertiesMap;
 
     /**
-     * Used when creating a message that we intend to send
+     * Used when creating a message that we intend to send.
+     * Sets the AMQP durable header to true.
      */
     public AmqpMessage()
     {
         this(Proton.message(), null, null);
+        setDurable(true);
     }
 
     /**
@@ -117,6 +119,16 @@ public abstract class AmqpMessage
         }
     }
 
+    public void setDurable(boolean durable)
+    {
+        _message.setDurable(durable);
+    }
+
+    public boolean isDurable()
+    {
+        return _message.isDurable();
+    }
+
     public void setContentType(String contentType)
     {
         //TODO: do we need to synchronise this?

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java Mon Nov 25 15:26:01 2013
@@ -55,6 +55,7 @@ public class BytesMessageImpl extends Me
     @Override
     protected AmqpBytesMessage prepareUnderlyingAmqpMessageForSending(AmqpBytesMessage amqpMessage)
     {
+        //TODO: we might be re-sending 'dataIn'
         amqpMessage.setBytes(_bytesOut.toByteArray());
 
         //TODO: do we need to do anything later with properties/headers etc?

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java Mon Nov 25 15:26:01 2013
@@ -21,6 +21,7 @@ package org.apache.qpid.jms.impl;
 import java.util.Collections;
 import java.util.Enumeration;
 
+import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -186,15 +187,27 @@ public abstract class MessageImpl<T exte
     @Override
     public int getJMSDeliveryMode() throws JMSException
     {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not Implemented");
+        if(getUnderlyingAmqpMessage(false).isDurable())
+        {
+            return DeliveryMode.PERSISTENT;
+        }
+        else
+        {
+            return DeliveryMode.NON_PERSISTENT;
+        }
     }
 
     @Override
     public void setJMSDeliveryMode(int deliveryMode) throws JMSException
     {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not Implemented");
+        if(DeliveryMode.PERSISTENT == deliveryMode)
+        {
+            getUnderlyingAmqpMessage(false).setDurable(true);
+        }
+        else
+        {
+            getUnderlyingAmqpMessage(false).setDurable(false);
+        }
     }
 
     @Override

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java Mon Nov 25 15:26:01 2013
@@ -33,18 +33,23 @@ public class SenderImpl extends LinkImpl
 {
     private AmqpSender _amqpSender;
 
-    public SenderImpl(SessionImpl sessionImpl, AmqpSender amqpSender)
+    public SenderImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl, AmqpSender amqpSender)
     {
-        super(sessionImpl.getConnectionImpl(), amqpSender);
+        super(connectionImpl, amqpSender);
         _amqpSender = amqpSender;
     }
 
-    @Override
-    public void send(Message message) throws JMSException
+    private void sendMessage(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
     {
         getConnectionImpl().lock();
         try
         {
+            //set the DeliveryMode if necessary
+            if(deliveryMode != message.getJMSDeliveryMode())
+            {
+                message.setJMSDeliveryMode(deliveryMode);
+            }
+
             AmqpMessage amqpMessage = getAmqpMessageFromJmsMessage(message);
 
             AmqpSentMessageToken sentMessage = _amqpSender.sendMessage(amqpMessage);
@@ -59,7 +64,6 @@ public class SenderImpl extends LinkImpl
         {
             getConnectionImpl().releaseLock();
         }
-
     }
 
     private AmqpMessage getAmqpMessageFromJmsMessage(Message message)
@@ -75,6 +79,10 @@ public class SenderImpl extends LinkImpl
         }
     }
 
+
+    //======= JMS Methods =======
+
+
     @Override
     public void setDisableMessageID(boolean value) throws JMSException
     {
@@ -153,6 +161,12 @@ public class SenderImpl extends LinkImpl
     }
 
     @Override
+    public void send(Message message) throws JMSException
+    {
+        sendMessage(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+    }
+
+    @Override
     public void send(Destination destination, Message message) throws JMSException
     {
         // TODO Auto-generated method stub

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Mon Nov 25 15:26:01 2013
@@ -140,7 +140,7 @@ public class SessionImpl implements Sess
         try
         {
             AmqpSender amqpSender = _amqpSession.createAmqpSender(address);
-            SenderImpl sender = new SenderImpl(this, amqpSender);
+            SenderImpl sender = new SenderImpl(this, _connectionImpl, amqpSender);
             _connectionImpl.stateChanged();
             sender.establish();
             return sender;

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java Mon Nov 25 15:26:01 2013
@@ -37,6 +37,7 @@ import org.apache.qpid.jms.test.testpeer
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationPropertiesDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.ApplicationPropertiesSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
 import org.apache.qpid.proton.amqp.DescribedType;
@@ -89,7 +90,10 @@ public class MessageIntegrationTest exte
             appPropsMatcher.withEntry(FLOAT_PROP, equalTo(FLOAT_PROP_VALUE));
             appPropsMatcher.withEntry(DOUBLE_PROP, equalTo(DOUBLE_PROP_VALUE));
 
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+
             TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
             messageMatcher.setApplicationPropertiesMatcher(appPropsMatcher);
             messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
             testPeer.expectTransfer(messageMatcher);

Added: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageProducerIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageProducerIntegrationTest.java?rev=1545320&view=auto
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageProducerIntegrationTest.java (added)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageProducerIntegrationTest.java Mon Nov 25 15:26:01 2013
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.junit.Test;
+
+public class MessageProducerIntegrationTest extends QpidJmsTestCase
+{
+    private final IntegrationTestFixture _testFixture = new IntegrationTestFixture();
+
+    @Test
+    public void testDefaultDeliveryModeProducesDurableMessages() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+
+            //Create and transfer a new message
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            testPeer.expectTransfer(messageMatcher);
+
+            Message message = session.createTextMessage();
+            assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+
+            producer.send(message);
+            assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+        }
+    }
+
+    @Test
+    public void testProducerOverridesMessageDeliveryMode() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+
+            //Create and transfer a new message, explicitly setting the deliveryMode on the
+            //message (which applications shouldn't) to NON_PERSISTENT and sending it to check
+            //that the producer ignores this value and sends the message as PERSISTENT(/durable)
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            testPeer.expectTransfer(messageMatcher);
+
+            Message message = session.createTextMessage();
+            message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
+            assertEquals(DeliveryMode.NON_PERSISTENT, message.getJMSDeliveryMode());
+
+            producer.send(message);
+
+            assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+        }
+    }
+}

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java Mon Nov 25 15:26:01 2013
@@ -40,6 +40,7 @@ import org.apache.qpid.jms.test.testpeer
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.DataDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
@@ -82,7 +83,9 @@ public class SessionIntegrationTest exte
             MessageProducer producer = session.createProducer(queue);
 
             String text = "myMessage";
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
             TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
             messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
             testPeer.expectTransfer(messageMatcher);
 
@@ -135,7 +138,9 @@ public class SessionIntegrationTest exte
             Queue queue = session.createQueue("myQueue");
             MessageProducer producer = session.createProducer(queue);
 
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
             TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
             messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
             testPeer.expectTransfer(messageMatcher);
 
@@ -189,9 +194,11 @@ public class SessionIntegrationTest exte
 
             byte[] content = "myBytes".getBytes();
 
-            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
             MessagePropertiesSectionMatcher propertiesMatcher = new MessagePropertiesSectionMatcher(true);
             propertiesMatcher.withContentType(equalTo(Symbol.valueOf(AmqpBytesMessage.CONTENT_TYPE)));
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
             messageMatcher.setPropertiesMatcher(propertiesMatcher);
             messageMatcher.setMessageContentMatcher(new EncodedDataMatcher(new Binary(content)));
 

Added: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java?rev=1545320&view=auto
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java (added)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java Mon Nov 25 15:26:01 2013
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import javax.jms.DeliveryMode;
+
+import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.jms.engine.AmqpMessage;
+import org.apache.qpid.jms.engine.AmqpSender;
+import org.apache.qpid.jms.engine.AmqpSentMessageToken;
+import org.apache.qpid.jms.engine.TestAmqpMessage;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class SenderImplTest extends QpidJmsTestCase
+{
+    private ConnectionImpl _mockConnection;
+    private AmqpSender _mockAmqpSender;
+    private SessionImpl _mockSession;
+
+    @Before
+    public void setUp() throws Exception
+    {
+        _mockConnection = Mockito.mock(ConnectionImpl.class);
+        _mockAmqpSender = Mockito.mock(AmqpSender.class);
+        _mockSession = Mockito.mock(SessionImpl.class);
+    }
+
+    @Test
+    public void testSenderOverriddesMessageDeliveryMode() throws Exception
+    {
+        //Create mock sent message token, ensure that it is immediately marked as Accepted
+        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
+        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
+        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
+        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+        SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender);
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+        TestMessageImpl testMessage = new TestMessageImpl(testAmqpMessage, null, null)
+        {
+            @Override
+            protected TestAmqpMessage prepareUnderlyingAmqpMessageForSending(TestAmqpMessage amqpMessage)
+            {
+                //NO-OP
+                return amqpMessage;
+            }
+        };
+
+        testMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        assertEquals(DeliveryMode.NON_PERSISTENT, testMessage.getJMSDeliveryMode());
+
+        senderImpl.send(testMessage);
+
+        assertEquals(DeliveryMode.PERSISTENT, testMessage.getJMSDeliveryMode());
+    }
+}

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java Mon Nov 25 15:26:01 2013
@@ -33,6 +33,9 @@ import org.hamcrest.TypeSafeMatcher;
  */
 public class TransferPayloadCompositeMatcher extends TypeSafeMatcher<Binary>
 {
+    private MessageHeaderSectionMatcher _msgHeadersMatcher;
+    private String _msgHeaderMatcherFailureDescription;
+
     private MessageAnnotationsSectionMatcher _msgAnnotationsMatcher;
     private String _msgAnnotationsMatcherFailureDescription;
     private MessagePropertiesSectionMatcher _propsMatcher;
@@ -52,6 +55,23 @@ public class TransferPayloadCompositeMat
         int origLength = receivedBinary.getLength();
         int bytesConsumed = 0;
 
+        //MessageHeader Section
+        if(_msgHeadersMatcher != null)
+        {
+            Binary msgHeaderEtcSubBinary = receivedBinary.subBinary(bytesConsumed, origLength - bytesConsumed);
+            try
+            {
+                bytesConsumed += _msgHeadersMatcher.verify(msgHeaderEtcSubBinary);
+            }
+            catch(Throwable t)
+            {
+                _msgHeaderMatcherFailureDescription = "\nActual encoded form of remaining bytes passed to MessageHeaderMatcher: " + msgHeaderEtcSubBinary;
+                _msgHeaderMatcherFailureDescription += "\nMessageHeaderMatcher generated throwable: " + t;
+
+                return false;
+            }
+        }
+
         //MessageAnnotations Section
         if(_msgAnnotationsMatcher != null)
         {
@@ -62,8 +82,8 @@ public class TransferPayloadCompositeMat
             }
             catch(Throwable t)
             {
-                _propsMatcherFailureDescription = "\nActual encoded form of remaining bytes passed to MessageAnnotationsMatcher: " + msgAnnotationsEtcSubBinary;
-                _propsMatcherFailureDescription += "\nMessageAnnotationsMatcher generated throwable: " + t;
+                _msgAnnotationsMatcherFailureDescription = "\nActual encoded form of remaining bytes passed to MessageAnnotationsMatcher: " + msgAnnotationsEtcSubBinary;
+                _msgAnnotationsMatcherFailureDescription += "\nMessageAnnotationsMatcher generated throwable: " + t;
 
                 return false;
             }
@@ -96,8 +116,8 @@ public class TransferPayloadCompositeMat
             }
             catch(Throwable t)
             {
-                _propsMatcherFailureDescription = "\nActual encoded form of remaining bytes passed to ApplicationPropertiesMatcher: " + appPropsEtcSubBinary;
-                _propsMatcherFailureDescription += "\nApplicationPropertiesMatcher generated throwable: " + t;
+                _appPropsMatcherFailureDescription = "\nActual encoded form of remaining bytes passed to ApplicationPropertiesMatcher: " + appPropsEtcSubBinary;
+                _appPropsMatcherFailureDescription += "\nApplicationPropertiesMatcher generated throwable: " + t;
 
                 return false;
             }
@@ -136,6 +156,14 @@ public class TransferPayloadCompositeMat
     {
         mismatchDescription.appendText("\nActual encoded form of the full Transfer frame payload: ").appendValue(item);
 
+        //MessageHeaders Section
+        if(_msgHeaderMatcherFailureDescription != null)
+        {
+            mismatchDescription.appendText("\nMessageHeadersMatcherFailed!");
+            mismatchDescription.appendText(_msgHeaderMatcherFailureDescription);
+            return;
+        }
+
         //MessageAnnotations Section
         if(_msgAnnotationsMatcherFailureDescription != null)
         {
@@ -169,6 +197,11 @@ public class TransferPayloadCompositeMat
         }
     }
 
+    public void setHeadersMatcher(MessageHeaderSectionMatcher msgHeadersMatcher)
+    {
+        _msgHeadersMatcher = msgHeadersMatcher;
+    }
+
     public void setMessageAnnotationsMatcher(MessageAnnotationsSectionMatcher msgAnnotationsMatcher)
     {
         _msgAnnotationsMatcher = msgAnnotationsMatcher;
@@ -179,13 +212,13 @@ public class TransferPayloadCompositeMat
         _propsMatcher = propsMatcher;
     }
 
-    public void setMessageContentMatcher(Matcher<Binary> msgContentMatcher)
+    public void setApplicationPropertiesMatcher(ApplicationPropertiesSectionMatcher appPropsMatcher)
     {
-        _msgContentMatcher = msgContentMatcher;
+        _appPropsMatcher = appPropsMatcher;
     }
 
-    public void setApplicationPropertiesMatcher(ApplicationPropertiesSectionMatcher appPropsMatcher)
+    public void setMessageContentMatcher(Matcher<Binary> msgContentMatcher)
     {
-        _appPropsMatcher = appPropsMatcher;
+        _msgContentMatcher = msgContentMatcher;
     }
 }
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org