You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/03/02 12:49:54 UTC
[1/4] qpid-jms git commit: add support for remotely closing links
Repository: qpid-jms
Updated Branches:
refs/heads/master 6a2b98e0d -> 8e5662ed6
add support for remotely closing links
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b82f427f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b82f427f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b82f427f
Branch: refs/heads/master
Commit: b82f427f2be1dd56e265da1c92d5024053d192d5
Parents: 6a2b98e
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Mar 2 11:03:12 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Mar 2 11:03:12 2015 +0000
----------------------------------------------------------------------
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 55 ++++++++++++++++++--
1 file changed, 52 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b82f427f/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index d25688d..001adee 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -109,6 +109,7 @@ public class TestAmqpPeer implements AutoCloseable
private byte[] _deferredBytes;
private int _lastInitiatedChannel = -1;
+ private UnsignedInteger _lastInitiatedLinkHandle = null;
public TestAmqpPeer() throws IOException
{
@@ -501,8 +502,10 @@ public class TestAmqpPeer implements AutoCloseable
@Override
public void setValues()
{
+ Object receivedHandle = attachMatcher.getReceivedHandle();
+
attachResponseSender.setChannel(attachMatcher.getActualChannel());
- attachResponse.setHandle(attachMatcher.getReceivedHandle());
+ attachResponse.setHandle(receivedHandle);
attachResponse.setName(attachMatcher.getReceivedName());
attachResponse.setSource(attachMatcher.getReceivedSource());
@@ -511,6 +514,8 @@ public class TestAmqpPeer implements AutoCloseable
trimTargetCapabilities(t);
attachResponse.setTarget(t);
+
+ _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
}
});
@@ -575,8 +580,10 @@ public class TestAmqpPeer implements AutoCloseable
@Override
public void setValues()
{
+ Object receivedHandle = attachMatcher.getReceivedHandle();
+
attachResponseSender.setChannel(attachMatcher.getActualChannel());
- attachResponse.setHandle(attachMatcher.getReceivedHandle());
+ attachResponse.setHandle(receivedHandle);
attachResponse.setName(attachMatcher.getReceivedName());
attachResponse.setSource(trimSourceOutcomesCapabilities(createSourceObjectFromDescribedType(attachMatcher.getReceivedSource())));
if(refuseLink) {
@@ -584,6 +591,8 @@ public class TestAmqpPeer implements AutoCloseable
} else {
attachResponse.setTarget(trimTargetCapabilities(createTargetObjectFromDescribedType(attachMatcher.getReceivedTarget())));
}
+
+ _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
}
});
@@ -666,8 +675,10 @@ public class TestAmqpPeer implements AutoCloseable
@Override
public void setValues()
{
+ Object receivedHandle = attachMatcher.getReceivedHandle();
+
attachResponseSender.setChannel(attachMatcher.getActualChannel());
- attachResponse.setHandle(attachMatcher.getReceivedHandle());
+ attachResponse.setHandle(receivedHandle);
attachResponse.setName(attachMatcher.getReceivedName());
attachResponse.setTarget(attachMatcher.getReceivedTarget());
if(refuseLink) {
@@ -675,6 +686,8 @@ public class TestAmqpPeer implements AutoCloseable
} else {
attachResponse.setSource(trimSourceOutcomesCapabilities(createSourceObjectFromDescribedType(attachMatcher.getReceivedSource())));
}
+
+ _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
}
});
@@ -1131,6 +1144,42 @@ public class TestAmqpPeer implements AutoCloseable
}
}
+ public void remotelyDetachLastOpenedLinkOnLastOpenedSession(boolean expectDetachResponse, boolean closed) {
+ synchronized (_handlersLock) {
+ CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
+
+ // Now generate the Detach for the appropriate link on the appropriate session
+ final DetachFrame detachFrame = new DetachFrame();
+ detachFrame.setClosed(closed);
+ // TODO: add an optional error msg+condition?
+
+ // The response frame channel will be dynamically set based on the previous frames. Using the -1 is an illegal placeholder.
+ final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, -1, detachFrame, null);
+ frameSender.setValueProvider(new ValueProvider() {
+ @Override
+ public void setValues() {
+ frameSender.setChannel(_lastInitiatedChannel);
+ detachFrame.setHandle(_lastInitiatedLinkHandle);
+ }
+ });
+ comp.add(frameSender);
+
+ if (expectDetachResponse) {
+ Matcher<Boolean> closeMatcher = null;
+ if (closed) {
+ closeMatcher = equalTo(true);
+ } else {
+ closeMatcher = Matchers.anyOf(equalTo(false), nullValue());
+ }
+
+ // Expect a response to our Detach.
+ final DetachMatcher detachMatcher = new DetachMatcher().withClosed(closeMatcher);
+ // TODO: enable matching on the channel number of the response.
+ addHandler(detachMatcher);
+ }
+ }
+ }
+
private CompositeAmqpPeerRunnable insertCompsiteActionForLastHandler() {
CompositeAmqpPeerRunnable comp = new CompositeAmqpPeerRunnable();
Handler h = getLastHandler();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[4/4] qpid-jms git commit: rename test class name to use JMS
convention like the other classes in the package
Posted by ro...@apache.org.
rename test class name to use JMS convention like the other classes in the package
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/8e5662ed
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/8e5662ed
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/8e5662ed
Branch: refs/heads/master
Commit: 8e5662ed6e81465d8cb74407bac4cbd2ab65175e
Parents: 3ebbe55
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Mar 2 11:15:26 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Mar 2 11:15:26 2015 +0000
----------------------------------------------------------------------
.../integration/ProducerIntegrationTest.java | 556 +++++++++++++++++++
.../jms/integration/SenderIntegrationTest.java | 556 -------------------
2 files changed, 556 insertions(+), 556 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8e5662ed/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
new file mode 100644
index 0000000..88c7028
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -0,0 +1,556 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.isA;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Date;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.IllegalStateException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.proton.amqp.UnsignedByte;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+
+public class ProducerIntegrationTest extends QpidJmsTestCase {
+ private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+ @Test(timeout = 10000)
+ public void testCloseSender() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ testPeer.expectDetach(true, true, true);
+ producer.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testDefaultDeliveryModeProducesDurableMessages() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ // Create and transfer a new message
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true)
+ .withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage();
+
+ producer.send(message);
+ assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testProducerOverridesMessageDeliveryMode() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ // Create and transfer a new message, explicitly setting the deliveryMode on the
+ // message (which applications shouldn't) to NON_PERSISTENT and sending it to check
+ // that the producer ignores this value and sends the message as PERSISTENT(/durable)
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true)
+ .withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage();
+ message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ assertEquals(DeliveryMode.NON_PERSISTENT, message.getJMSDeliveryMode());
+
+ producer.send(message);
+
+ assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ /**
+ * Test that when a message is sent the JMSDestination header is set to
+ * the Destination used by the producer.
+ */
+ @Test(timeout = 5000)
+ public void testSendingMessageSetsJMSDestination() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+
+ String text = "myMessage";
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage(text);
+
+ assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
+
+ producer.send(message);
+
+ assertEquals("Should have had JMSDestination set", queue, message.getJMSDestination());
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testSendingMessageSetsJMSTimestamp() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+
+ // Create matcher to expect the absolute-expiry-time field of the properties section to
+ // be set to a value greater than 'now'+ttl, within a delta.
+ long currentTime = System.currentTimeMillis();
+ Date creationLower = new Date(currentTime);
+ Date creationUpper = new Date(currentTime + 3000);
+ Matcher<Date> inRange = both(greaterThanOrEqualTo(creationLower)).and(lessThanOrEqualTo(creationUpper));
+
+ String text = "myMessage";
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true)
+ .withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true)
+ .withCreationTime(inRange);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage(text);
+
+ producer.send(message);
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ /**
+ * Test that after sending a message with the disableMessageTimestamp hint set, the
+ * message object has a 0 JMSTimestamp value, and no creation-time field value was set.
+ */
+ @Test(timeout = 5000)
+ public void testSendingMessageWithDisableMessageTimestampHint() throws Exception {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+
+ String text = "myMessage";
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
+ propsMatcher.withCreationTime(nullValue()); // Check there is no creation-time value;
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage(text);
+
+ assertEquals("JMSTimestamp should not yet be set", 0, message.getJMSTimestamp());
+
+ producer.setDisableMessageTimestamp(true);
+
+ producer.send(message);
+ testPeer.waitForAllHandlersToComplete(1000);
+
+ assertEquals("JMSTimestamp should still not be set", 0, message.getJMSTimestamp());
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testSendingMessageSetsJMSExpirationRelatedAbsoluteExpiryAndTtlFields() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+
+ long currentTime = System.currentTimeMillis();
+ long ttl = 100_000;
+
+ Date expirationLower = new Date(currentTime + ttl);
+ Date expirationUpper = new Date(currentTime + ttl + 3000);
+
+ // Create matcher to expect the absolute-expiry-time field of the properties section to
+ // be set to a value greater than 'now'+ttl, within a delta.
+ Matcher<Date> inRange = both(greaterThanOrEqualTo(expirationLower)).and(lessThanOrEqualTo(expirationUpper));
+
+ String text = "myMessage";
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+ headersMatcher.withDurable(equalTo(true));
+ headersMatcher.withTtl(equalTo(UnsignedInteger.valueOf(ttl)));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true)
+ .withAbsoluteExpiryTime(inRange);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage(text);
+
+ producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, ttl);
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testSendingMessageWithJMS_AMQP_TTLSetPositive() throws Exception {
+ sendingMessageWithJMS_AMQP_TTLSetTestImpl(100_000, 20_000);
+ }
+
+ @Test(timeout = 10000)
+ public void testSendingMessageWithJMS_AMQP_TTLSetZero() throws Exception {
+ sendingMessageWithJMS_AMQP_TTLSetTestImpl(50_000, 0);
+ }
+
+ public void sendingMessageWithJMS_AMQP_TTLSetTestImpl(long jmsTtl, long amqpTtl) throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+
+ long currentTime = System.currentTimeMillis();
+ Date expirationLower = new Date(currentTime + jmsTtl);
+ Date expirationUpper = new Date(currentTime + jmsTtl + 3000);
+
+ // Create matcher to expect the absolute-expiry-time field of the properties section to
+ // be set to a value greater than 'now'+ttl, within a delta.
+ Matcher<Date> inRange = both(greaterThanOrEqualTo(expirationLower)).and(lessThanOrEqualTo(expirationUpper));
+
+ String text = "myMessage";
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+ headersMatcher.withDurable(equalTo(true));
+ // verify the ttl field matches the JMS_AMQP_TTL value, rather than the standard JMS send TTL value.
+ if (amqpTtl == 0) {
+ headersMatcher.withTtl(nullValue());
+ } else {
+ headersMatcher.withTtl(equalTo(UnsignedInteger.valueOf(amqpTtl)));
+ }
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true)
+ .withAbsoluteExpiryTime(inRange);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage(text);
+ message.setLongProperty(AmqpMessageSupport.JMS_AMQP_TTL, amqpTtl);
+
+ producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, jmsTtl);
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
+
+ /**
+ * Test that when a message is sent with default priority of 4, the emitted AMQP message has no value in the header
+ * priority field, since the default for that field is already 4.
+ */
+ @Test(timeout = 10000)
+ public void testDefaultPriorityProducesMessagesWithoutPriorityField() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ // Create and transfer a new message
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true)
+ .withPriority(equalTo(null));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage();
+
+ assertEquals(Message.DEFAULT_PRIORITY, message.getJMSPriority());
+
+ producer.send(message);
+
+ assertEquals(Message.DEFAULT_PRIORITY, message.getJMSPriority());
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ /**
+ * Test that when a message is sent with a non-default priority, the emitted AMQP message has that value in the
+ * header priority field, and the JMS message has had JMSPriority set.
+ */
+ @Test(timeout = 10000)
+ public void testNonDefaultPriorityProducesMessagesWithPriorityFieldAndSetsJMSPriority() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageProducer producer = session.createProducer(queue);
+
+ byte priority = 5;
+
+ // Create and transfer a new message
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true)
+ .withPriority(equalTo(UnsignedByte.valueOf(priority)));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage();
+
+ assertEquals(Message.DEFAULT_PRIORITY, message.getJMSPriority());
+
+ producer.send(message, DeliveryMode.PERSISTENT, priority, Message.DEFAULT_TIME_TO_LIVE);
+
+ assertEquals(priority, message.getJMSPriority());
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ /**
+ * Test that upon sending a message, the sender sets the JMSMessageID on the Message object,
+ * and that the value is included in the AMQP message sent by the client.
+ */
+ @Test(timeout = 10000)
+ public void testSendingMessageSetsJMSMessageID() throws Exception {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+
+ String text = "myMessage";
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withMessageId(isA(String.class));
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage(text);
+
+ assertNull("JMSMessageID should not yet be set", message.getJMSMessageID());
+
+ producer.send(message);
+
+ String jmsMessageID = message.getJMSMessageID();
+ assertNotNull("JMSMessageID should be set", jmsMessageID);
+ assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));
+
+ //Get the value that was actually transmitted/received, verify it is a string, compare to what we have locally
+ testPeer.waitForAllHandlersToComplete(1000);
+ Object receivedMessageId = propsMatcher.getReceivedMessageId();
+
+ assertTrue("Expected string message id to be sent", receivedMessageId instanceof String);
+ assertTrue("Expected JMSMessageId value to be present in AMQP message", jmsMessageID.endsWith((String)receivedMessageId));
+ }
+ }
+
+ /**
+ * Test that after sending a message with the disableMessageID hint set, the message
+ * object has a null JMSMessageID value, and no message-id field value was set.
+ */
+ @Test(timeout = 5000)
+ public void testSendingMessageWithDisableMessageIDHint() throws Exception {
+ try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String queueName = "myQueue";
+ Queue queue = session.createQueue(queueName);
+ MessageProducer producer = session.createProducer(queue);
+
+ String text = "myMessage";
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
+ propsMatcher.withMessageId(nullValue()); // Check there is no message-id value;
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+ messageMatcher.setPropertiesMatcher(propsMatcher);
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+ testPeer.expectTransfer(messageMatcher);
+
+ Message message = session.createTextMessage(text);
+
+ assertNull("JMSMessageID should not yet be set", message.getJMSMessageID());
+
+ producer.setDisableMessageID(true);
+
+ producer.send(message);
+ testPeer.waitForAllHandlersToComplete(1000);
+
+ assertNull("JMSMessageID should still not yet be set", message.getJMSMessageID());
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testRemotelyCloseProducer() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+
+ testPeer.expectBegin(true);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create a producer, then remotely end it afterwards.
+ testPeer.expectSenderAttach();
+ testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true);
+
+ Queue queue = session.createQueue("myQueue");
+ final MessageProducer producer = session.createProducer(queue);
+
+ // Verify the producer gets marked closed
+ testPeer.waitForAllHandlersToComplete(1000);
+ assertTrue("producer never closed.", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ try {
+ producer.getDestination();
+ } catch (IllegalStateException jmsise) {
+ return true;
+ }
+ return false;
+ }
+ }, 2000, 10));
+
+ // Try closing it explicitly, should effectively no-op in client.
+ // The test peer will throw during close if it sends anything.
+ producer.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8e5662ed/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SenderIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SenderIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SenderIntegrationTest.java
deleted file mode 100644
index e6738d1..0000000
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SenderIntegrationTest.java
+++ /dev/null
@@ -1,556 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.jms.integration;
-
-import static org.hamcrest.Matchers.both;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.isA;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Date;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.IllegalStateException;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
-import org.apache.qpid.jms.test.QpidJmsTestCase;
-import org.apache.qpid.jms.test.Wait;
-import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
-import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
-import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
-import org.apache.qpid.proton.amqp.UnsignedByte;
-import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.hamcrest.Matcher;
-import org.junit.Test;
-
-public class SenderIntegrationTest extends QpidJmsTestCase {
- private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
-
- @Test(timeout = 10000)
- public void testCloseSender() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- testPeer.expectBegin(true);
- testPeer.expectSenderAttach();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue("myQueue");
- MessageProducer producer = session.createProducer(queue);
-
- testPeer.expectDetach(true, true, true);
- producer.close();
-
- testPeer.waitForAllHandlersToComplete(1000);
- }
- }
-
- @Test(timeout = 10000)
- public void testDefaultDeliveryModeProducesDurableMessages() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- testPeer.expectBegin(true);
- testPeer.expectSenderAttach();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue("myQueue");
- MessageProducer producer = session.createProducer(queue);
-
- // Create and transfer a new message
- MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true)
- .withDurable(equalTo(true));
- MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
- messageMatcher.setHeadersMatcher(headersMatcher);
- messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
- testPeer.expectTransfer(messageMatcher);
-
- Message message = session.createTextMessage();
-
- producer.send(message);
- assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
-
- testPeer.waitForAllHandlersToComplete(1000);
- }
- }
-
- @Test(timeout = 10000)
- public void testProducerOverridesMessageDeliveryMode() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- testPeer.expectBegin(true);
- testPeer.expectSenderAttach();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue("myQueue");
- MessageProducer producer = session.createProducer(queue);
-
- // Create and transfer a new message, explicitly setting the deliveryMode on the
- // message (which applications shouldn't) to NON_PERSISTENT and sending it to check
- // that the producer ignores this value and sends the message as PERSISTENT(/durable)
- MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true)
- .withDurable(equalTo(true));
- MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
- messageMatcher.setHeadersMatcher(headersMatcher);
- messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
- testPeer.expectTransfer(messageMatcher);
-
- Message message = session.createTextMessage();
- message.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
- assertEquals(DeliveryMode.NON_PERSISTENT, message.getJMSDeliveryMode());
-
- producer.send(message);
-
- assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
-
- testPeer.waitForAllHandlersToComplete(1000);
- }
- }
-
- /**
- * Test that when a message is sent the JMSDestination header is set to
- * the Destination used by the producer.
- */
- @Test(timeout = 5000)
- public void testSendingMessageSetsJMSDestination() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- testPeer.expectBegin(true);
- testPeer.expectSenderAttach();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- String queueName = "myQueue";
- Queue queue = session.createQueue(queueName);
- MessageProducer producer = session.createProducer(queue);
-
- String text = "myMessage";
- MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
- MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
- MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
- messageMatcher.setHeadersMatcher(headersMatcher);
- messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
- messageMatcher.setPropertiesMatcher(propsMatcher);
- messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
- testPeer.expectTransfer(messageMatcher);
-
- Message message = session.createTextMessage(text);
-
- assertNull("Should not yet have a JMSDestination", message.getJMSDestination());
-
- producer.send(message);
-
- assertEquals("Should have had JMSDestination set", queue, message.getJMSDestination());
-
- testPeer.waitForAllHandlersToComplete(1000);
- }
- }
-
- @Test(timeout = 10000)
- public void testSendingMessageSetsJMSTimestamp() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- testPeer.expectBegin(true);
- testPeer.expectSenderAttach();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- String queueName = "myQueue";
- Queue queue = session.createQueue(queueName);
- MessageProducer producer = session.createProducer(queue);
-
- // Create matcher to expect the absolute-expiry-time field of the properties section to
- // be set to a value greater than 'now'+ttl, within a delta.
- long currentTime = System.currentTimeMillis();
- Date creationLower = new Date(currentTime);
- Date creationUpper = new Date(currentTime + 3000);
- Matcher<Date> inRange = both(greaterThanOrEqualTo(creationLower)).and(lessThanOrEqualTo(creationUpper));
-
- String text = "myMessage";
- MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true)
- .withDurable(equalTo(true));
- MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
- MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true)
- .withCreationTime(inRange);
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
- messageMatcher.setHeadersMatcher(headersMatcher);
- messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
- messageMatcher.setPropertiesMatcher(propsMatcher);
- messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
- testPeer.expectTransfer(messageMatcher);
-
- Message message = session.createTextMessage(text);
-
- producer.send(message);
-
- testPeer.waitForAllHandlersToComplete(1000);
- }
- }
-
- /**
- * Test that after sending a message with the disableMessageTimestamp hint set, the
- * message object has a 0 JMSTimestamp value, and no creation-time field value was set.
- */
- @Test(timeout = 5000)
- public void testSendingMessageWithDisableMessageTimestampHint() throws Exception {
- try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- testPeer.expectBegin(true);
- testPeer.expectSenderAttach();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- String queueName = "myQueue";
- Queue queue = session.createQueue(queueName);
- MessageProducer producer = session.createProducer(queue);
-
- String text = "myMessage";
- MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
- MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
- MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
- propsMatcher.withCreationTime(nullValue()); // Check there is no creation-time value;
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
- messageMatcher.setHeadersMatcher(headersMatcher);
- messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
- messageMatcher.setPropertiesMatcher(propsMatcher);
- messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
- testPeer.expectTransfer(messageMatcher);
-
- Message message = session.createTextMessage(text);
-
- assertEquals("JMSTimestamp should not yet be set", 0, message.getJMSTimestamp());
-
- producer.setDisableMessageTimestamp(true);
-
- producer.send(message);
- testPeer.waitForAllHandlersToComplete(1000);
-
- assertEquals("JMSTimestamp should still not be set", 0, message.getJMSTimestamp());
- }
- }
-
- @Test(timeout = 10000)
- public void testSendingMessageSetsJMSExpirationRelatedAbsoluteExpiryAndTtlFields() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- testPeer.expectBegin(true);
- testPeer.expectSenderAttach();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- String queueName = "myQueue";
- Queue queue = session.createQueue(queueName);
- MessageProducer producer = session.createProducer(queue);
-
- long currentTime = System.currentTimeMillis();
- long ttl = 100_000;
-
- Date expirationLower = new Date(currentTime + ttl);
- Date expirationUpper = new Date(currentTime + ttl + 3000);
-
- // Create matcher to expect the absolute-expiry-time field of the properties section to
- // be set to a value greater than 'now'+ttl, within a delta.
- Matcher<Date> inRange = both(greaterThanOrEqualTo(expirationLower)).and(lessThanOrEqualTo(expirationUpper));
-
- String text = "myMessage";
- MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
- headersMatcher.withDurable(equalTo(true));
- headersMatcher.withTtl(equalTo(UnsignedInteger.valueOf(ttl)));
- MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
- MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true)
- .withAbsoluteExpiryTime(inRange);
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
- messageMatcher.setHeadersMatcher(headersMatcher);
- messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
- messageMatcher.setPropertiesMatcher(propsMatcher);
- messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
- testPeer.expectTransfer(messageMatcher);
-
- Message message = session.createTextMessage(text);
-
- producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, ttl);
-
- testPeer.waitForAllHandlersToComplete(1000);
- }
- }
-
- @Test(timeout = 10000)
- public void testSendingMessageWithJMS_AMQP_TTLSetPositive() throws Exception {
- sendingMessageWithJMS_AMQP_TTLSetTestImpl(100_000, 20_000);
- }
-
- @Test(timeout = 10000)
- public void testSendingMessageWithJMS_AMQP_TTLSetZero() throws Exception {
- sendingMessageWithJMS_AMQP_TTLSetTestImpl(50_000, 0);
- }
-
- public void sendingMessageWithJMS_AMQP_TTLSetTestImpl(long jmsTtl, long amqpTtl) throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- testPeer.expectBegin(true);
- testPeer.expectSenderAttach();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- String queueName = "myQueue";
- Queue queue = session.createQueue(queueName);
- MessageProducer producer = session.createProducer(queue);
-
- long currentTime = System.currentTimeMillis();
- Date expirationLower = new Date(currentTime + jmsTtl);
- Date expirationUpper = new Date(currentTime + jmsTtl + 3000);
-
- // Create matcher to expect the absolute-expiry-time field of the properties section to
- // be set to a value greater than 'now'+ttl, within a delta.
- Matcher<Date> inRange = both(greaterThanOrEqualTo(expirationLower)).and(lessThanOrEqualTo(expirationUpper));
-
- String text = "myMessage";
- MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
- headersMatcher.withDurable(equalTo(true));
- // verify the ttl field matches the JMS_AMQP_TTL value, rather than the standard JMS send TTL value.
- if (amqpTtl == 0) {
- headersMatcher.withTtl(nullValue());
- } else {
- headersMatcher.withTtl(equalTo(UnsignedInteger.valueOf(amqpTtl)));
- }
- MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
- MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true)
- .withAbsoluteExpiryTime(inRange);
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
- messageMatcher.setHeadersMatcher(headersMatcher);
- messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
- messageMatcher.setPropertiesMatcher(propsMatcher);
- messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
- testPeer.expectTransfer(messageMatcher);
-
- Message message = session.createTextMessage(text);
- message.setLongProperty(AmqpMessageSupport.JMS_AMQP_TTL, amqpTtl);
-
- producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, jmsTtl);
-
- testPeer.waitForAllHandlersToComplete(2000);
- }
- }
-
- /**
- * Test that when a message is sent with default priority of 4, the emitted AMQP message has no value in the header
- * priority field, since the default for that field is already 4.
- */
- @Test(timeout = 10000)
- public void testDefaultPriorityProducesMessagesWithoutPriorityField() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- testPeer.expectBegin(true);
- testPeer.expectSenderAttach();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue("myQueue");
- MessageProducer producer = session.createProducer(queue);
-
- // Create and transfer a new message
- MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true)
- .withPriority(equalTo(null));
- MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
- messageMatcher.setHeadersMatcher(headersMatcher);
- messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
- testPeer.expectTransfer(messageMatcher);
-
- Message message = session.createTextMessage();
-
- assertEquals(Message.DEFAULT_PRIORITY, message.getJMSPriority());
-
- producer.send(message);
-
- assertEquals(Message.DEFAULT_PRIORITY, message.getJMSPriority());
-
- testPeer.waitForAllHandlersToComplete(1000);
- }
- }
-
- /**
- * Test that when a message is sent with a non-default priority, the emitted AMQP message has that value in the
- * header priority field, and the JMS message has had JMSPriority set.
- */
- @Test(timeout = 10000)
- public void testNonDefaultPriorityProducesMessagesWithPriorityFieldAndSetsJMSPriority() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- testPeer.expectBegin(true);
- testPeer.expectSenderAttach();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue("myQueue");
- MessageProducer producer = session.createProducer(queue);
-
- byte priority = 5;
-
- // Create and transfer a new message
- MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true)
- .withPriority(equalTo(UnsignedByte.valueOf(priority)));
- MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
- messageMatcher.setHeadersMatcher(headersMatcher);
- messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
- testPeer.expectTransfer(messageMatcher);
-
- Message message = session.createTextMessage();
-
- assertEquals(Message.DEFAULT_PRIORITY, message.getJMSPriority());
-
- producer.send(message, DeliveryMode.PERSISTENT, priority, Message.DEFAULT_TIME_TO_LIVE);
-
- assertEquals(priority, message.getJMSPriority());
-
- testPeer.waitForAllHandlersToComplete(1000);
- }
- }
-
- /**
- * Test that upon sending a message, the sender sets the JMSMessageID on the Message object,
- * and that the value is included in the AMQP message sent by the client.
- */
- @Test(timeout = 10000)
- public void testSendingMessageSetsJMSMessageID() throws Exception {
- try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- testPeer.expectBegin(true);
- testPeer.expectSenderAttach();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- String queueName = "myQueue";
- Queue queue = session.createQueue(queueName);
- MessageProducer producer = session.createProducer(queue);
-
- String text = "myMessage";
- MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
- MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
- MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withMessageId(isA(String.class));
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
- messageMatcher.setHeadersMatcher(headersMatcher);
- messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
- messageMatcher.setPropertiesMatcher(propsMatcher);
- messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
- testPeer.expectTransfer(messageMatcher);
-
- Message message = session.createTextMessage(text);
-
- assertNull("JMSMessageID should not yet be set", message.getJMSMessageID());
-
- producer.send(message);
-
- String jmsMessageID = message.getJMSMessageID();
- assertNotNull("JMSMessageID should be set", jmsMessageID);
- assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:"));
-
- //Get the value that was actually transmitted/received, verify it is a string, compare to what we have locally
- testPeer.waitForAllHandlersToComplete(1000);
- Object receivedMessageId = propsMatcher.getReceivedMessageId();
-
- assertTrue("Expected string message id to be sent", receivedMessageId instanceof String);
- assertTrue("Expected JMSMessageId value to be present in AMQP message", jmsMessageID.endsWith((String)receivedMessageId));
- }
- }
-
- /**
- * Test that after sending a message with the disableMessageID hint set, the message
- * object has a null JMSMessageID value, and no message-id field value was set.
- */
- @Test(timeout = 5000)
- public void testSendingMessageWithDisableMessageIDHint() throws Exception {
- try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
- testPeer.expectBegin(true);
- testPeer.expectSenderAttach();
-
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- String queueName = "myQueue";
- Queue queue = session.createQueue(queueName);
- MessageProducer producer = session.createProducer(queue);
-
- String text = "myMessage";
- MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
- MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
- MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
- propsMatcher.withMessageId(nullValue()); // Check there is no message-id value;
- TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
- messageMatcher.setHeadersMatcher(headersMatcher);
- messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
- messageMatcher.setPropertiesMatcher(propsMatcher);
- messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
- testPeer.expectTransfer(messageMatcher);
-
- Message message = session.createTextMessage(text);
-
- assertNull("JMSMessageID should not yet be set", message.getJMSMessageID());
-
- producer.setDisableMessageID(true);
-
- producer.send(message);
- testPeer.waitForAllHandlersToComplete(1000);
-
- assertNull("JMSMessageID should still not yet be set", message.getJMSMessageID());
- }
- }
-
- @Test(timeout = 5000)
- public void testRemotelyCloseProducer() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- Connection connection = testFixture.establishConnecton(testPeer);
-
- testPeer.expectBegin(true);
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- // Create a producer, then remotely end it afterwards.
- testPeer.expectSenderAttach();
- testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true);
-
- Queue queue = session.createQueue("myQueue");
- final MessageProducer producer = session.createProducer(queue);
-
- // Verify the producer gets marked closed
- testPeer.waitForAllHandlersToComplete(1000);
- assertTrue("producer never closed.", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- try {
- producer.getDestination();
- } catch (IllegalStateException jmsise) {
- return true;
- }
- return false;
- }
- }, 2000, 10));
-
- // Try closing it explicitly, should effectively no-op in client.
- // The test peer will throw during close if it sends anything.
- producer.close();
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/4] qpid-jms git commit: add test of producer behaviour on link
which is remotely closed
Posted by ro...@apache.org.
add test of producer behaviour on link which is remotely closed
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f7868684
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f7868684
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f7868684
Branch: refs/heads/master
Commit: f78686848eee4bcd3f61c82c72ccfc623663416f
Parents: b82f427
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Mar 2 11:06:43 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Mar 2 11:06:43 2015 +0000
----------------------------------------------------------------------
.../jms/integration/SenderIntegrationTest.java | 37 ++++++++++++++++++++
1 file changed, 37 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f7868684/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SenderIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SenderIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SenderIntegrationTest.java
index dff1a6d..e6738d1 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SenderIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SenderIntegrationTest.java
@@ -33,6 +33,7 @@ import java.util.Date;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.IllegalStateException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
@@ -40,6 +41,7 @@ import javax.jms.Session;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
@@ -516,4 +518,39 @@ public class SenderIntegrationTest extends QpidJmsTestCase {
assertNull("JMSMessageID should still not yet be set", message.getJMSMessageID());
}
}
+
+ @Test(timeout = 5000)
+ public void testRemotelyCloseProducer() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+
+ testPeer.expectBegin(true);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create a producer, then remotely end it afterwards.
+ testPeer.expectSenderAttach();
+ testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true);
+
+ Queue queue = session.createQueue("myQueue");
+ final MessageProducer producer = session.createProducer(queue);
+
+ // Verify the producer gets marked closed
+ testPeer.waitForAllHandlersToComplete(1000);
+ assertTrue("producer never closed.", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ try {
+ producer.getDestination();
+ } catch (IllegalStateException jmsise) {
+ return true;
+ }
+ return false;
+ }
+ }, 2000, 10));
+
+ // Try closing it explicitly, should effectively no-op in client.
+ // The test peer will throw during close if it sends anything.
+ producer.close();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/4] qpid-jms git commit: add test of consumer behaviour on link
which is remotely closed
Posted by ro...@apache.org.
add test of consumer behaviour on link which is remotely closed
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/3ebbe55d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/3ebbe55d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/3ebbe55d
Branch: refs/heads/master
Commit: 3ebbe55d6d3ea0fe777c0cdc071dc59afade426b
Parents: f786868
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Mar 2 11:11:06 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Mar 2 11:11:06 2015 +0000
----------------------------------------------------------------------
.../integration/ConsumerIntegrationTest.java | 99 ++++++++++++++++++++
1 file changed, 99 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3ebbe55d/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
new file mode 100644
index 0000000..7ab8844
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import javax.jms.Connection;
+import javax.jms.IllegalStateException;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.junit.Test;
+
+public class ConsumerIntegrationTest extends QpidJmsTestCase {
+ private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+ @Test(timeout = 5000)
+ public void testCloseConsumer() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ testPeer.expectBegin(true);
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlow();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ testPeer.expectDetach(true, true, true);
+ consumer.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testRemotelyCloseConsumer() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+
+ testPeer.expectBegin(true);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create a consumer, then remotely end it afterwards.
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlow();
+ testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true);
+
+ Queue queue = session.createQueue("myQueue");
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ // Verify the consumer gets marked closed
+ testPeer.waitForAllHandlersToComplete(1000);
+ assertTrue("consumer never closed.", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ try {
+ consumer.getMessageListener();
+ } catch (IllegalStateException jmsise) {
+ return true;
+ }
+ return false;
+ }
+ }, 2000, 10));
+
+ try {
+ consumer.getMessageListener();
+ fail("Expected ISE to be thrown due to being closed");
+ } catch (IllegalStateException jmsise) {
+ // expected
+ }
+
+ // Try closing it explicitly, should effectively no-op in client.
+ // The test peer will throw during close if it sends anything.
+ consumer.close();
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org