You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2013/11/25 16:26:01 UTC
svn commit: r1545320 - in /qpid/jms/trunk/src:
main/java/org/apache/qpid/jms/engine/ main/java/org/apache/qpid/jms/impl/
test/java/org/apache/qpid/jms/ test/java/org/apache/qpid/jms/impl/
test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/
Author: robbie
Date: Mon Nov 25 15:26:01 2013
New Revision: 1545320
URL: http://svn.apache.org/r1545320
Log:
QPIDJMS-9: add support for the durable header field in messages, ensure the producer sets/overrides this as necessary
Added:
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageProducerIntegrationTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java
Modified:
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java Mon Nov 25 15:26:01 2013
@@ -49,11 +49,13 @@ public abstract class AmqpMessage
private volatile Map<String,Object> _applicationPropertiesMap;
/**
- * Used when creating a message that we intend to send
+ * Used when creating a message that we intend to send.
+ * Sets the AMQP durable header to true.
*/
public AmqpMessage()
{
this(Proton.message(), null, null);
+ setDurable(true);
}
/**
@@ -117,6 +119,16 @@ public abstract class AmqpMessage
}
}
+ public void setDurable(boolean durable)
+ {
+ _message.setDurable(durable);
+ }
+
+ public boolean isDurable()
+ {
+ return _message.isDurable();
+ }
+
public void setContentType(String contentType)
{
//TODO: do we need to synchronise this?
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java Mon Nov 25 15:26:01 2013
@@ -55,6 +55,7 @@ public class BytesMessageImpl extends Me
@Override
protected AmqpBytesMessage prepareUnderlyingAmqpMessageForSending(AmqpBytesMessage amqpMessage)
{
+ //TODO: we might be re-sending 'dataIn'
amqpMessage.setBytes(_bytesOut.toByteArray());
//TODO: do we need to do anything later with properties/headers etc?
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java Mon Nov 25 15:26:01 2013
@@ -21,6 +21,7 @@ package org.apache.qpid.jms.impl;
import java.util.Collections;
import java.util.Enumeration;
+import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -186,15 +187,27 @@ public abstract class MessageImpl<T exte
@Override
public int getJMSDeliveryMode() throws JMSException
{
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
+ if(getUnderlyingAmqpMessage(false).isDurable())
+ {
+ return DeliveryMode.PERSISTENT;
+ }
+ else
+ {
+ return DeliveryMode.NON_PERSISTENT;
+ }
}
@Override
public void setJMSDeliveryMode(int deliveryMode) throws JMSException
{
- // TODO Auto-generated method stub
- throw new UnsupportedOperationException("Not Implemented");
+ if(DeliveryMode.PERSISTENT == deliveryMode)
+ {
+ getUnderlyingAmqpMessage(false).setDurable(true);
+ }
+ else
+ {
+ getUnderlyingAmqpMessage(false).setDurable(false);
+ }
}
@Override
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java Mon Nov 25 15:26:01 2013
@@ -33,18 +33,23 @@ public class SenderImpl extends LinkImpl
{
private AmqpSender _amqpSender;
- public SenderImpl(SessionImpl sessionImpl, AmqpSender amqpSender)
+ public SenderImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl, AmqpSender amqpSender)
{
- super(sessionImpl.getConnectionImpl(), amqpSender);
+ super(connectionImpl, amqpSender);
_amqpSender = amqpSender;
}
- @Override
- public void send(Message message) throws JMSException
+ private void sendMessage(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
{
getConnectionImpl().lock();
try
{
+ //set the DeliveryMode if necessary
+ if(deliveryMode != message.getJMSDeliveryMode())
+ {
+ message.setJMSDeliveryMode(deliveryMode);
+ }
+
AmqpMessage amqpMessage = getAmqpMessageFromJmsMessage(message);
AmqpSentMessageToken sentMessage = _amqpSender.sendMessage(amqpMessage);
@@ -59,7 +64,6 @@ public class SenderImpl extends LinkImpl
{
getConnectionImpl().releaseLock();
}
-
}
private AmqpMessage getAmqpMessageFromJmsMessage(Message message)
@@ -75,6 +79,10 @@ public class SenderImpl extends LinkImpl
}
}
+
+ //======= JMS Methods =======
+
+
@Override
public void setDisableMessageID(boolean value) throws JMSException
{
@@ -153,6 +161,12 @@ public class SenderImpl extends LinkImpl
}
@Override
+ public void send(Message message) throws JMSException
+ {
+ sendMessage(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+
+ @Override
public void send(Destination destination, Message message) throws JMSException
{
// TODO Auto-generated method stub
Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Mon Nov 25 15:26:01 2013
@@ -140,7 +140,7 @@ public class SessionImpl implements Sess
try
{
AmqpSender amqpSender = _amqpSession.createAmqpSender(address);
- SenderImpl sender = new SenderImpl(this, amqpSender);
+ SenderImpl sender = new SenderImpl(this, _connectionImpl, amqpSender);
_connectionImpl.stateChanged();
sender.establish();
return sender;
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageIntegrationTest.java Mon Nov 25 15:26:01 2013
@@ -37,6 +37,7 @@ import org.apache.qpid.jms.test.testpeer
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationPropertiesDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.sections.ApplicationPropertiesSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.proton.amqp.DescribedType;
@@ -89,7 +90,10 @@ public class MessageIntegrationTest exte
appPropsMatcher.withEntry(FLOAT_PROP, equalTo(FLOAT_PROP_VALUE));
appPropsMatcher.withEntry(DOUBLE_PROP, equalTo(DOUBLE_PROP_VALUE));
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setApplicationPropertiesMatcher(appPropsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
testPeer.expectTransfer(messageMatcher);
Added: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageProducerIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageProducerIntegrationTest.java?rev=1545320&view=auto
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageProducerIntegrationTest.java (added)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/MessageProducerIntegrationTest.java Mon Nov 25 15:26:01 2013
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.junit.Test;
+
+public class MessageProducerIntegrationTest extends QpidJmsTestCase
+{
+ private final IntegrationTestFixture _testFixture = new IntegrationTestFixture();
+
+ @Test
+ public void testDefaultDeliveryModeProducesDurableMessages() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ //Create and transfer a new message
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage();
+ assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+
+ producer.send(message);
+ assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+ }
+ }
+
+ @Test
+ public void testProducerOverridesMessageDeliveryMode() throws Exception
+ {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+ {
+ Connection connection = _testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ //Create and transfer a new message, explicitly setting the deliveryMode on the
+ //message (which applications shouldn't) to NON_PERSISTENT and sending it to check
+ //that the producer ignores this value and sends the message as PERSISTENT(/durable)
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage();
+ message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ assertEquals(DeliveryMode.NON_PERSISTENT, message.getJMSDeliveryMode());
+
+ producer.send(message);
+
+ assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+ }
+ }
+}
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java Mon Nov 25 15:26:01 2013
@@ -40,6 +40,7 @@ import org.apache.qpid.jms.test.testpeer
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.DataDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
@@ -82,7 +83,9 @@ public class SessionIntegrationTest exte
MessageProducer producer = session.createProducer(queue);
String text = "myMessage";
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
testPeer.expectTransfer(messageMatcher);
@@ -135,7 +138,9 @@ public class SessionIntegrationTest exte
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(queue);
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
testPeer.expectTransfer(messageMatcher);
@@ -189,9 +194,11 @@ public class SessionIntegrationTest exte
byte[] content = "myBytes".getBytes();
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessagePropertiesSectionMatcher propertiesMatcher = new MessagePropertiesSectionMatcher(true);
propertiesMatcher.withContentType(equalTo(Symbol.valueOf(AmqpBytesMessage.CONTENT_TYPE)));
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setPropertiesMatcher(propertiesMatcher);
messageMatcher.setMessageContentMatcher(new EncodedDataMatcher(new Binary(content)));
Added: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java?rev=1545320&view=auto
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java (added)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java Mon Nov 25 15:26:01 2013
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import javax.jms.DeliveryMode;
+
+import org.apache.qpid.jms.QpidJmsTestCase;
+import org.apache.qpid.jms.engine.AmqpMessage;
+import org.apache.qpid.jms.engine.AmqpSender;
+import org.apache.qpid.jms.engine.AmqpSentMessageToken;
+import org.apache.qpid.jms.engine.TestAmqpMessage;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class SenderImplTest extends QpidJmsTestCase
+{
+ private ConnectionImpl _mockConnection;
+ private AmqpSender _mockAmqpSender;
+ private SessionImpl _mockSession;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ _mockConnection = Mockito.mock(ConnectionImpl.class);
+ _mockAmqpSender = Mockito.mock(AmqpSender.class);
+ _mockSession = Mockito.mock(SessionImpl.class);
+ }
+
+ @Test
+ public void testSenderOverriddesMessageDeliveryMode() throws Exception
+ {
+ //Create mock sent message token, ensure that it is immediately marked as Accepted
+ AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
+ Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
+ Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
+ ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+ SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender);
+
+ TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+ TestMessageImpl testMessage = new TestMessageImpl(testAmqpMessage, null, null)
+ {
+ @Override
+ protected TestAmqpMessage prepareUnderlyingAmqpMessageForSending(TestAmqpMessage amqpMessage)
+ {
+ //NO-OP
+ return amqpMessage;
+ }
+ };
+
+ testMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ assertEquals(DeliveryMode.NON_PERSISTENT, testMessage.getJMSDeliveryMode());
+
+ senderImpl.send(testMessage);
+
+ assertEquals(DeliveryMode.PERSISTENT, testMessage.getJMSDeliveryMode());
+ }
+}
Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java?rev=1545320&r1=1545319&r2=1545320&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java Mon Nov 25 15:26:01 2013
@@ -33,6 +33,9 @@ import org.hamcrest.TypeSafeMatcher;
*/
public class TransferPayloadCompositeMatcher extends TypeSafeMatcher<Binary>
{
+ private MessageHeaderSectionMatcher _msgHeadersMatcher;
+ private String _msgHeaderMatcherFailureDescription;
+
private MessageAnnotationsSectionMatcher _msgAnnotationsMatcher;
private String _msgAnnotationsMatcherFailureDescription;
private MessagePropertiesSectionMatcher _propsMatcher;
@@ -52,6 +55,23 @@ public class TransferPayloadCompositeMat
int origLength = receivedBinary.getLength();
int bytesConsumed = 0;
+ //MessageHeader Section
+ if(_msgHeadersMatcher != null)
+ {
+ Binary msgHeaderEtcSubBinary = receivedBinary.subBinary(bytesConsumed, origLength - bytesConsumed);
+ try
+ {
+ bytesConsumed += _msgHeadersMatcher.verify(msgHeaderEtcSubBinary);
+ }
+ catch(Throwable t)
+ {
+ _msgHeaderMatcherFailureDescription = "\nActual encoded form of remaining bytes passed to MessageHeaderMatcher: " + msgHeaderEtcSubBinary;
+ _msgHeaderMatcherFailureDescription += "\nMessageHeaderMatcher generated throwable: " + t;
+
+ return false;
+ }
+ }
+
//MessageAnnotations Section
if(_msgAnnotationsMatcher != null)
{
@@ -62,8 +82,8 @@ public class TransferPayloadCompositeMat
}
catch(Throwable t)
{
- _propsMatcherFailureDescription = "\nActual encoded form of remaining bytes passed to MessageAnnotationsMatcher: " + msgAnnotationsEtcSubBinary;
- _propsMatcherFailureDescription += "\nMessageAnnotationsMatcher generated throwable: " + t;
+ _msgAnnotationsMatcherFailureDescription = "\nActual encoded form of remaining bytes passed to MessageAnnotationsMatcher: " + msgAnnotationsEtcSubBinary;
+ _msgAnnotationsMatcherFailureDescription += "\nMessageAnnotationsMatcher generated throwable: " + t;
return false;
}
@@ -96,8 +116,8 @@ public class TransferPayloadCompositeMat
}
catch(Throwable t)
{
- _propsMatcherFailureDescription = "\nActual encoded form of remaining bytes passed to ApplicationPropertiesMatcher: " + appPropsEtcSubBinary;
- _propsMatcherFailureDescription += "\nApplicationPropertiesMatcher generated throwable: " + t;
+ _appPropsMatcherFailureDescription = "\nActual encoded form of remaining bytes passed to ApplicationPropertiesMatcher: " + appPropsEtcSubBinary;
+ _appPropsMatcherFailureDescription += "\nApplicationPropertiesMatcher generated throwable: " + t;
return false;
}
@@ -136,6 +156,14 @@ public class TransferPayloadCompositeMat
{
mismatchDescription.appendText("\nActual encoded form of the full Transfer frame payload: ").appendValue(item);
+ //MessageHeaders Section
+ if(_msgHeaderMatcherFailureDescription != null)
+ {
+ mismatchDescription.appendText("\nMessageHeadersMatcherFailed!");
+ mismatchDescription.appendText(_msgHeaderMatcherFailureDescription);
+ return;
+ }
+
//MessageAnnotations Section
if(_msgAnnotationsMatcherFailureDescription != null)
{
@@ -169,6 +197,11 @@ public class TransferPayloadCompositeMat
}
}
+ public void setHeadersMatcher(MessageHeaderSectionMatcher msgHeadersMatcher)
+ {
+ _msgHeadersMatcher = msgHeadersMatcher;
+ }
+
public void setMessageAnnotationsMatcher(MessageAnnotationsSectionMatcher msgAnnotationsMatcher)
{
_msgAnnotationsMatcher = msgAnnotationsMatcher;
@@ -179,13 +212,13 @@ public class TransferPayloadCompositeMat
_propsMatcher = propsMatcher;
}
- public void setMessageContentMatcher(Matcher<Binary> msgContentMatcher)
+ public void setApplicationPropertiesMatcher(ApplicationPropertiesSectionMatcher appPropsMatcher)
{
- _msgContentMatcher = msgContentMatcher;
+ _appPropsMatcher = appPropsMatcher;
}
- public void setApplicationPropertiesMatcher(ApplicationPropertiesSectionMatcher appPropsMatcher)
+ public void setMessageContentMatcher(Matcher<Binary> msgContentMatcher)
{
- _appPropsMatcher = appPropsMatcher;
+ _msgContentMatcher = msgContentMatcher;
}
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org